Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.spark.examples.mllib

import scala.language.reflectiveCalls

import scopt.OptionParser

import org.apache.spark.{SparkConf, SparkContext}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@

package org.apache.spark.examples.mllib

import org.apache.spark.SparkConf
import org.apache.spark.mllib.clustering.StreamingKMeans
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.clustering.StreamingKMeans
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
Expand All @@ -36,28 +36,28 @@ import org.apache.spark.streaming.{Seconds, StreamingContext}
* `(y,[x1,x2,x3,...,xn])`
* Where y is some identifier. n must be the same for train and test.
*
* Usage: StreamingKmeans <trainingDir> <testDir> <batchDuration> <numClusters> <numDimensions>
* Usage:
* StreamingKMeansExample <trainingDir> <testDir> <batchDuration> <numClusters> <numDimensions>
*
* To run on your local machine using the two directories `trainingDir` and `testDir`,
* with updates every 5 seconds, 2 dimensions per data point, and 3 clusters, call:
* $ bin/run-example \
* org.apache.spark.examples.mllib.StreamingKMeans trainingDir testDir 5 3 2
* $ bin/run-example mllib.StreamingKMeansExample trainingDir testDir 5 3 2
*
* As you add text files to `trainingDir` the clusters will continuously update.
* Anytime you add text files to `testDir`, you'll see predicted labels using the current model.
*
*/
object StreamingKMeans {
object StreamingKMeansExample {

def main(args: Array[String]) {
if (args.length != 5) {
System.err.println(
"Usage: StreamingKMeans " +
"Usage: StreamingKMeansExample " +
"<trainingDir> <testDir> <batchDuration> <numClusters> <numDimensions>")
System.exit(1)
}

val conf = new SparkConf().setMaster("local").setAppName("StreamingLinearRegression")
val conf = new SparkConf().setMaster("local").setAppName("StreamingKMeansExample")
val ssc = new StreamingContext(conf, Seconds(args(2).toLong))

val trainingData = ssc.textFileStream(args(0)).map(Vectors.parse)
Expand Down