Skip to content

Commit 43770da

Browse files
committed
Merge branch 'master' of git://git.apache.org/spark into SPARK-4973
2 parents 88feecd + 06dc4b5 commit 43770da

File tree

7 files changed

+884
-913
lines changed

7 files changed

+884
-913
lines changed

core/src/main/scala/org/apache/spark/deploy/master/Master.scala

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -720,26 +720,27 @@ private[spark] class Master(
720720
def rebuildSparkUI(app: ApplicationInfo): Boolean = {
721721
val appName = app.desc.name
722722
val notFoundBasePath = HistoryServer.UI_PATH_PREFIX + "/not-found"
723-
val eventLogFile = app.desc.eventLogDir
724-
.map { dir => EventLoggingListener.getLogPath(dir, app.id) }
725-
.getOrElse {
726-
// Event logging is not enabled for this application
727-
app.desc.appUiUrl = notFoundBasePath
728-
return false
729-
}
730-
val fs = Utils.getHadoopFileSystem(eventLogFile, hadoopConf)
723+
try {
724+
val eventLogFile = app.desc.eventLogDir
725+
.map { dir => EventLoggingListener.getLogPath(dir, app.id) }
726+
.getOrElse {
727+
// Event logging is not enabled for this application
728+
app.desc.appUiUrl = notFoundBasePath
729+
return false
730+
}
731+
732+
val fs = Utils.getHadoopFileSystem(eventLogFile, hadoopConf)
731733

732-
if (fs.exists(new Path(eventLogFile + EventLoggingListener.IN_PROGRESS))) {
733-
// Event logging is enabled for this application, but the application is still in progress
734-
val title = s"Application history not found (${app.id})"
735-
var msg = s"Application $appName is still in progress."
736-
logWarning(msg)
737-
msg = URLEncoder.encode(msg, "UTF-8")
738-
app.desc.appUiUrl = notFoundBasePath + s"?msg=$msg&title=$title"
739-
return false
740-
}
734+
if (fs.exists(new Path(eventLogFile + EventLoggingListener.IN_PROGRESS))) {
735+
// Event logging is enabled for this application, but the application is still in progress
736+
val title = s"Application history not found (${app.id})"
737+
var msg = s"Application $appName is still in progress."
738+
logWarning(msg)
739+
msg = URLEncoder.encode(msg, "UTF-8")
740+
app.desc.appUiUrl = notFoundBasePath + s"?msg=$msg&title=$title"
741+
return false
742+
}
741743

742-
try {
743744
val (logInput, sparkVersion) = EventLoggingListener.openEventLog(new Path(eventLogFile), fs)
744745
val replayBus = new ReplayListenerBus()
745746
val ui = SparkUI.createHistoryUI(new SparkConf, replayBus, new SecurityManager(conf),
@@ -758,7 +759,7 @@ private[spark] class Master(
758759
case fnf: FileNotFoundException =>
759760
// Event logging is enabled for this application, but no event logs are found
760761
val title = s"Application history not found (${app.id})"
761-
var msg = s"No event logs found for application $appName in $eventLogFile."
762+
var msg = s"No event logs found for application $appName in ${app.desc.eventLogDir}."
762763
logWarning(msg)
763764
msg += " Did you specify the correct logging directory?"
764765
msg = URLEncoder.encode(msg, "UTF-8")

graphx/src/main/scala/org/apache/spark/graphx/GraphOps.scala

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,32 @@ class GraphOps[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]) extends Seriali
278278
retVal
279279
}
280280

281+
/**
282+
* Convert bi-directional edges into uni-directional ones.
283+
* Some graph algorithms (e.g., TriangleCount) assume that an input graph
284+
* has its edges in canonical direction.
285+
* This function rewrites the vertex ids of edges so that srcIds are bigger
286+
* than dstIds, and merges the duplicated edges.
287+
*
288+
* @param mergeFunc the user defined reduce function which should
289+
* be commutative and associative and is used to combine the output
290+
* of the map phase
291+
*
292+
* @return the resulting graph with canonical edges
293+
*/
294+
def convertToCanonicalEdges(
295+
mergeFunc: (ED, ED) => ED = (e1, e2) => e1): Graph[VD, ED] = {
296+
val newEdges =
297+
graph.edges
298+
.map {
299+
case e if e.srcId < e.dstId => ((e.srcId, e.dstId), e.attr)
300+
case e => ((e.dstId, e.srcId), e.attr)
301+
}
302+
.reduceByKey(mergeFunc)
303+
.map(e => new Edge(e._1._1, e._1._2, e._2))
304+
Graph(graph.vertices, newEdges)
305+
}
306+
281307
/**
282308
* Execute a Pregel-like iterative vertex-parallel abstraction. The
283309
* user-defined vertex-program `vprog` is executed in parallel on

graphx/src/test/scala/org/apache/spark/graphx/GraphOpsSuite.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,21 @@ class GraphOpsSuite extends FunSuite with LocalSparkContext {
7979
}
8080
}
8181

82+
test ("convertToCanonicalEdges") {
83+
withSpark { sc =>
84+
val vertices =
85+
sc.parallelize(Seq[(VertexId, String)]((1, "one"), (2, "two"), (3, "three")), 2)
86+
val edges =
87+
sc.parallelize(Seq(Edge(1, 2, 1), Edge(2, 1, 1), Edge(3, 2, 2)))
88+
val g: Graph[String, Int] = Graph(vertices, edges)
89+
90+
val g1 = g.convertToCanonicalEdges()
91+
92+
val e = g1.edges.collect().toSet
93+
assert(e === Set(Edge(1, 2, 1), Edge(2, 3, 2)))
94+
}
95+
}
96+
8297
test("collectEdgesCycleDirectionOut") {
8398
withSpark { sc =>
8499
val graph = getCycleGraph(sc, 100)

0 commit comments

Comments
 (0)