@@ -434,15 +434,182 @@ This example follows the simple text document `Pipeline` illustrated in the figu
434434<div class =" codetabs " >
435435
436436<div data-lang =" scala " >
437- {% include_example scala/org/apache/spark/examples/ml/CrossValidatorExample.scala %}
437+ {% highlight scala %}
438+ import org.apache.spark.ml.Pipeline
439+ import org.apache.spark.ml.classification.LogisticRegression
440+ import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
441+ import org.apache.spark.mllib.linalg.Vector
442+ import org.apache.spark.sql.Row
443+
444+ // Prepare training documents from a list of (id, text, label) tuples.
445+ val training = sqlContext.createDataFrame(Seq(
446+ (0L, "a b c d e spark", 1.0),
447+ (1L, "b d", 0.0),
448+ (2L, "spark f g h", 1.0),
449+ (3L, "hadoop mapreduce", 0.0)
450+ )).toDF("id", "text", "label")
451+
452+ // Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
453+ val tokenizer = new Tokenizer()
454+ .setInputCol("text")
455+ .setOutputCol("words")
456+ val hashingTF = new HashingTF()
457+ .setNumFeatures(1000)
458+ .setInputCol(tokenizer.getOutputCol)
459+ .setOutputCol("features")
460+ val lr = new LogisticRegression()
461+ .setMaxIter(10)
462+ .setRegParam(0.01)
463+ val pipeline = new Pipeline()
464+ .setStages(Array(tokenizer, hashingTF, lr))
465+
466+ // Fit the pipeline to training documents.
467+ val model = pipeline.fit(training)
468+
469+ // Prepare test documents, which are unlabeled (id, text) tuples.
470+ val test = sqlContext.createDataFrame(Seq(
471+ (4L, "spark i j k"),
472+ (5L, "l m n"),
473+ (6L, "mapreduce spark"),
474+ (7L, "apache hadoop")
475+ )).toDF("id", "text")
476+
477+ // Make predictions on test documents.
478+ model.transform(test)
479+ .select("id", "text", "probability", "prediction")
480+ .collect()
481+ .foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
482+ println(s"($id, $text) --> prob=$prob, prediction=$prediction")
483+ }
484+
485+ {% endhighlight %}
438486</div >
439487
440488<div data-lang =" java " >
441- {% include_example java/org/apache/spark/examples/ml/JavaCrossValidatorExample.java %}
489+ {% highlight java %}
490+ import java.util.Arrays;
491+ import java.util.List;
492+
493+ import org.apache.spark.ml.Pipeline;
494+ import org.apache.spark.ml.PipelineModel;
495+ import org.apache.spark.ml.PipelineStage;
496+ import org.apache.spark.ml.classification.LogisticRegression;
497+ import org.apache.spark.ml.feature.HashingTF;
498+ import org.apache.spark.ml.feature.Tokenizer;
499+ import org.apache.spark.sql.DataFrame;
500+ import org.apache.spark.sql.Row;
501+
502+ // Labeled and unlabeled instance types.
503+ // Spark SQL can infer schema from Java Beans.
504+ public class Document implements Serializable {
505+ private long id;
506+ private String text;
507+
508+ public Document(long id, String text) {
509+ this.id = id;
510+ this.text = text;
511+ }
512+
513+ public long getId() { return this.id; }
514+ public void setId(long id) { this.id = id; }
515+
516+ public String getText() { return this.text; }
517+ public void setText(String text) { this.text = text; }
518+ }
519+
520+ public class LabeledDocument extends Document implements Serializable {
521+ private double label;
522+
523+ public LabeledDocument(long id, String text, double label) {
524+ super(id, text);
525+ this.label = label;
526+ }
527+
528+ public double getLabel() { return this.label; }
529+ public void setLabel(double label) { this.label = label; }
530+ }
531+
532+ // Prepare training documents, which are labeled.
533+ DataFrame training = sqlContext.createDataFrame(Arrays.asList(
534+ new LabeledDocument(0L, "a b c d e spark", 1.0),
535+ new LabeledDocument(1L, "b d", 0.0),
536+ new LabeledDocument(2L, "spark f g h", 1.0),
537+ new LabeledDocument(3L, "hadoop mapreduce", 0.0)
538+ ), LabeledDocument.class);
539+
540+ // Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
541+ Tokenizer tokenizer = new Tokenizer()
542+ .setInputCol("text")
543+ .setOutputCol("words");
544+ HashingTF hashingTF = new HashingTF()
545+ .setNumFeatures(1000)
546+ .setInputCol(tokenizer.getOutputCol())
547+ .setOutputCol("features");
548+ LogisticRegression lr = new LogisticRegression()
549+ .setMaxIter(10)
550+ .setRegParam(0.01);
551+ Pipeline pipeline = new Pipeline()
552+ .setStages(new PipelineStage[ ] {tokenizer, hashingTF, lr});
553+
554+ // Fit the pipeline to training documents.
555+ PipelineModel model = pipeline.fit(training);
556+
557+ // Prepare test documents, which are unlabeled.
558+ DataFrame test = sqlContext.createDataFrame(Arrays.asList(
559+ new Document(4L, "spark i j k"),
560+ new Document(5L, "l m n"),
561+ new Document(6L, "mapreduce spark"),
562+ new Document(7L, "apache hadoop")
563+ ), Document.class);
564+
565+ // Make predictions on test documents.
566+ DataFrame predictions = model.transform(test);
567+ for (Row r: predictions.select("id", "text", "probability", "prediction").collect()) {
568+ System.out.println("(" + r.get(0) + ", " + r.get(1) + ") --> prob=" + r.get(2)
569+ + ", prediction=" + r.get(3));
570+ }
571+
572+ {% endhighlight %}
442573</div >
443574
444575<div data-lang =" python " >
445- {% include_example python/ml/cross_validator.py %}
576+ {% highlight python %}
577+ from pyspark.ml import Pipeline
578+ from pyspark.ml.classification import LogisticRegression
579+ from pyspark.ml.feature import HashingTF, Tokenizer
580+ from pyspark.sql import Row
581+
582+ # Prepare training documents from a list of (id, text, label) tuples.
583+ LabeledDocument = Row("id", "text", "label")
584+ training = sqlContext.createDataFrame([
585+ (0L, "a b c d e spark", 1.0),
586+ (1L, "b d", 0.0),
587+ (2L, "spark f g h", 1.0),
588+ (3L, "hadoop mapreduce", 0.0)] , [ "id", "text", "label"] )
589+
590+ # Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
591+ tokenizer = Tokenizer(inputCol="text", outputCol="words")
592+ hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
593+ lr = LogisticRegression(maxIter=10, regParam=0.01)
594+ pipeline = Pipeline(stages=[ tokenizer, hashingTF, lr] )
595+
596+ # Fit the pipeline to training documents.
597+ model = pipeline.fit(training)
598+
599+ # Prepare test documents, which are unlabeled (id, text) tuples.
600+ test = sqlContext.createDataFrame([
601+ (4L, "spark i j k"),
602+ (5L, "l m n"),
603+ (6L, "mapreduce spark"),
604+ (7L, "apache hadoop")] , [ "id", "text"] )
605+
606+ # Make predictions on test documents and print columns of interest.
607+ prediction = model.transform(test)
608+ selected = prediction.select("id", "text", "prediction")
609+ for row in selected.collect():
610+ print(row)
611+
612+ {% endhighlight %}
446613</div >
447614
448615</div >
0 commit comments