Skip to content

Commit 454c73d

Browse files
committed
Merge branch 'master' into SPARK-6263
2 parents a353354 + bf7e81a commit 454c73d

File tree

20 files changed

+1123
-132
lines changed

20 files changed

+1123
-132
lines changed

LICENSE

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -836,6 +836,22 @@ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
836836
See the License for the specific language governing permissions and
837837
limitations under the License.
838838

839+
========================================================================
840+
For vis.js (core/src/main/resources/org/apache/spark/ui/static/vis.min.js):
841+
========================================================================
842+
Copyright (C) 2010-2015 Almende B.V.
843+
844+
Vis.js is dual licensed under both
845+
846+
* The Apache 2.0 License
847+
http://www.apache.org/licenses/LICENSE-2.0
848+
849+
and
850+
851+
* The MIT License
852+
http://opensource.org/licenses/MIT
853+
854+
Vis.js may be distributed under either license.
839855

840856
========================================================================
841857
BSD-style licenses

core/src/main/resources/org/apache/spark/ui/static/timeline-view.css

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ div#application-timeline, div#job-timeline {
1919
margin-bottom: 30px;
2020
}
2121

22-
#application-timeline div.legend-area {
22+
#application-timeline div.legend-area,
23+
#job-timeline div.legend-area {
2324
margin-top: 5px;
2425
}
2526

core/src/main/resources/org/apache/spark/ui/static/timeline-view.js

Lines changed: 12 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -39,23 +39,24 @@ function drawApplicationTimeline(groupArray, eventObjArray, startTime) {
3939

4040
function setupJobEventAction() {
4141
$(".item.range.job.application-timeline-object").each(function() {
42-
var getJobId = function(baseElem) {
42+
var getSelectorForJobEntry = function(baseElem) {
4343
var jobIdText = $($(baseElem).find(".application-timeline-content")[0]).text();
4444
var jobId = jobIdText.match("\\(Job (\\d+)\\)")[1];
45-
return jobId;
45+
return "#job-" + jobId;
4646
};
4747

4848
$(this).click(function() {
49-
window.location.href = "job/?id=" + getJobId(this);
49+
var jobPagePath = $(getSelectorForJobEntry(this)).find("a").attr("href")
50+
window.location.href = jobPagePath
5051
});
5152

5253
$(this).hover(
5354
function() {
54-
$("#job-" + getJobId(this)).addClass("corresponding-item-hover");
55+
$(getSelectorForJobEntry(this)).addClass("corresponding-item-hover");
5556
$($(this).find("div.application-timeline-content")[0]).tooltip("show");
5657
},
5758
function() {
58-
$("#job-" + getJobId(this)).removeClass("corresponding-item-hover");
59+
$(getSelectorForJobEntry(this)).removeClass("corresponding-item-hover");
5960
$($(this).find("div.application-timeline-content")[0]).tooltip("hide");
6061
}
6162
);
@@ -97,32 +98,24 @@ function drawJobTimeline(groupArray, eventObjArray, startTime) {
9798

9899
function setupStageEventAction() {
99100
$(".item.range.stage.job-timeline-object").each(function() {
100-
var getStageIdAndAttempt = function(baseElem) {
101+
var getSelectorForStageEntry = function(baseElem) {
101102
var stageIdText = $($(baseElem).find(".job-timeline-content")[0]).text();
102103
var stageIdAndAttempt = stageIdText.match("\\(Stage (\\d+\\.\\d+)\\)")[1].split(".");
103-
return stageIdAndAttempt;
104+
return "#stage-" + stageIdAndAttempt[0] + "-" + stageIdAndAttempt[1];
104105
};
105106

106107
$(this).click(function() {
107-
var idAndAttempt = getStageIdAndAttempt(this);
108-
var id = idAndAttempt[0];
109-
var attempt = idAndAttempt[1];
110-
window.location.href = "../../stages/stage/?id=" + id + "&attempt=" + attempt;
108+
var stagePagePath = $(getSelectorForStageEntry(this)).find("a").attr("href")
109+
window.location.href = stagePagePath
111110
});
112111

113112
$(this).hover(
114113
function() {
115-
var idAndAttempt = getStageIdAndAttempt(this);
116-
var id = idAndAttempt[0];
117-
var attempt = idAndAttempt[1];
118-
$("#stage-" + id + "-" + attempt).addClass("corresponding-item-hover");
114+
$(getSelectorForStageEntry(this)).addClass("corresponding-item-hover");
119115
$($(this).find("div.job-timeline-content")[0]).tooltip("show");
120116
},
121117
function() {
122-
var idAndAttempt = getStageIdAndAttempt(this);
123-
var id = idAndAttempt[0];
124-
var attempt = idAndAttempt[1];
125-
$("#stage-" + id + "-" + attempt).removeClass("corresponding-item-hover");
118+
$(getSelectorForStageEntry(this)).removeClass("corresponding-item-hover");
126119
$($(this).find("div.job-timeline-content")[0]).tooltip("hide");
127120
}
128121
);

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1161,8 +1161,8 @@ abstract class RDD[T: ClassTag](
11611161
*/
11621162
@Experimental
11631163
def countApproxDistinct(p: Int, sp: Int): Long = withScope {
1164-
require(p >= 4, s"p ($p) must be at least 4")
1165-
require(sp <= 32, s"sp ($sp) cannot be greater than 32")
1164+
require(p >= 4, s"p ($p) must be >= 4")
1165+
require(sp <= 32, s"sp ($sp) must be <= 32")
11661166
require(sp == 0 || p <= sp, s"p ($p) cannot be greater than sp ($sp)")
11671167
val zeroCounter = new HyperLogLogPlus(p, sp)
11681168
aggregate(zeroCounter)(
@@ -1187,8 +1187,9 @@ abstract class RDD[T: ClassTag](
11871187
* It must be greater than 0.000017.
11881188
*/
11891189
def countApproxDistinct(relativeSD: Double = 0.05): Long = withScope {
1190+
require(relativeSD > 0.000017, s"accuracy ($relativeSD) must be greater than 0.000017")
11901191
val p = math.ceil(2.0 * math.log(1.054 / relativeSD) / math.log(2)).toInt
1191-
countApproxDistinct(p, 0)
1192+
countApproxDistinct(if (p < 4) 4 else p, 0)
11921193
}
11931194

11941195
/**

core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@ class RDDSuite extends FunSuite with SharedSparkContext {
8989
val simpleRdd = sc.makeRDD(uniformDistro, 10)
9090
assert(error(simpleRdd.countApproxDistinct(8, 0), size) < 0.2)
9191
assert(error(simpleRdd.countApproxDistinct(12, 0), size) < 0.1)
92+
assert(error(simpleRdd.countApproxDistinct(0.02), size) < 0.1)
93+
assert(error(simpleRdd.countApproxDistinct(0.5), size) < 0.22)
9294
}
9395

9496
test("SparkContext.union") {

docs/streaming-programming-guide.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1915,7 +1915,7 @@ In that case, consider
19151915
[reducing](#reducing-the-processing-time-of-each-batch) the batch processing time.
19161916

19171917
The progress of a Spark Streaming program can also be monitored using the
1918-
[StreamingListener](api/scala/index.html#org.apache.spark.scheduler.StreamingListener) interface,
1918+
[StreamingListener](api/scala/index.html#org.apache.spark.streaming.scheduler.StreamingListener) interface,
19191919
which allows you to get receiver status and processing times. Note that this is a developer API
19201920
and it is likely to be improved upon (i.e., more information reported) in the future.
19211921

examples/src/main/scala/org/apache/spark/examples/mllib/LDAExample.scala

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ import scopt.OptionParser
2626
import org.apache.log4j.{Level, Logger}
2727

2828
import org.apache.spark.{SparkContext, SparkConf}
29-
import org.apache.spark.mllib.clustering.{DistributedLDAModel, LDA}
29+
import org.apache.spark.mllib.clustering.{EMLDAOptimizer, OnlineLDAOptimizer, DistributedLDAModel, LDA}
3030
import org.apache.spark.mllib.linalg.{Vector, Vectors}
3131
import org.apache.spark.rdd.RDD
3232

@@ -48,6 +48,7 @@ object LDAExample {
4848
topicConcentration: Double = -1,
4949
vocabSize: Int = 10000,
5050
stopwordFile: String = "",
51+
algorithm: String = "em",
5152
checkpointDir: Option[String] = None,
5253
checkpointInterval: Int = 10) extends AbstractParams[Params]
5354

@@ -78,6 +79,10 @@ object LDAExample {
7879
.text(s"filepath for a list of stopwords. Note: This must fit on a single machine." +
7980
s" default: ${defaultParams.stopwordFile}")
8081
.action((x, c) => c.copy(stopwordFile = x))
82+
opt[String]("algorithm")
83+
.text(s"inference algorithm to use. em and online are supported." +
84+
s" default: ${defaultParams.algorithm}")
85+
.action((x, c) => c.copy(algorithm = x))
8186
opt[String]("checkpointDir")
8287
.text(s"Directory for checkpointing intermediate results." +
8388
s" Checkpointing helps with recovery and eliminates temporary shuffle files on disk." +
@@ -128,7 +133,17 @@ object LDAExample {
128133

129134
// Run LDA.
130135
val lda = new LDA()
131-
lda.setK(params.k)
136+
137+
val optimizer = params.algorithm.toLowerCase match {
138+
case "em" => new EMLDAOptimizer
139+
// add (1.0 / actualCorpusSize) to MiniBatchFraction be more robust on tiny datasets.
140+
case "online" => new OnlineLDAOptimizer().setMiniBatchFraction(0.05 + 1.0 / actualCorpusSize)
141+
case _ => throw new IllegalArgumentException(
142+
s"Only em, online are supported but got ${params.algorithm}.")
143+
}
144+
145+
lda.setOptimizer(optimizer)
146+
.setK(params.k)
132147
.setMaxIterations(params.maxIterations)
133148
.setDocConcentration(params.docConcentration)
134149
.setTopicConcentration(params.topicConcentration)
@@ -137,14 +152,18 @@ object LDAExample {
137152
sc.setCheckpointDir(params.checkpointDir.get)
138153
}
139154
val startTime = System.nanoTime()
140-
val ldaModel = lda.run(corpus).asInstanceOf[DistributedLDAModel]
155+
val ldaModel = lda.run(corpus)
141156
val elapsed = (System.nanoTime() - startTime) / 1e9
142157

143158
println(s"Finished training LDA model. Summary:")
144159
println(s"\t Training time: $elapsed sec")
145-
val avgLogLikelihood = ldaModel.logLikelihood / actualCorpusSize.toDouble
146-
println(s"\t Training data average log likelihood: $avgLogLikelihood")
147-
println()
160+
161+
if (ldaModel.isInstanceOf[DistributedLDAModel]) {
162+
val distLDAModel = ldaModel.asInstanceOf[DistributedLDAModel]
163+
val avgLogLikelihood = distLDAModel.logLikelihood / actualCorpusSize.toDouble
164+
println(s"\t Training data average log likelihood: $avgLogLikelihood")
165+
println()
166+
}
148167

149168
// Print the topics, showing the top-weighted terms for each topic.
150169
val topicIndices = ldaModel.describeTopics(maxTermsPerTopic = 10)

0 commit comments

Comments
 (0)