From 8a086a01663b03014d6b0cd326358d939a343116 Mon Sep 17 00:00:00 2001 From: jiayuezhangsc Date: Sat, 5 Nov 2016 22:12:56 -0700 Subject: [PATCH 1/2] [SPARK-18286][ML][WIP] Add Scala/Java/Python examples for MinHash and RandomProjection --- .../spark/examples/ml/JavaMinHashExample.java | 82 ++++++++++++++++++ .../ml/JavaRandomProjectionExample.java | 83 +++++++++++++++++++ .../spark/examples/ml/MinHashExample.scala | 65 +++++++++++++++ .../examples/ml/RandomProjectionExample.scala | 66 +++++++++++++++ 4 files changed, 296 insertions(+) create mode 100644 examples/src/main/java/org/apache/spark/examples/ml/JavaMinHashExample.java create mode 100644 examples/src/main/java/org/apache/spark/examples/ml/JavaRandomProjectionExample.java create mode 100644 examples/src/main/scala/org/apache/spark/examples/ml/MinHashExample.scala create mode 100644 examples/src/main/scala/org/apache/spark/examples/ml/RandomProjectionExample.scala diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaMinHashExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaMinHashExample.java new file mode 100644 index 000000000000..72d8549249c3 --- /dev/null +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaMinHashExample.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.ml; + +// $example on$ + +import org.apache.spark.ml.feature.MinHash; +import org.apache.spark.ml.feature.MinHashModel; +import org.apache.spark.ml.linalg.Vector; +import org.apache.spark.ml.linalg.VectorUDT; +import org.apache.spark.ml.linalg.Vectors; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; + +import java.util.Arrays; +import java.util.List; +// $example off$ + +public class JavaMinHashExample { + public static void main(String[] args) { + SparkSession spark = SparkSession + .builder() + .appName("JavaMinHashExample") + .getOrCreate(); + + // $example on$ + List data = Arrays.asList( + RowFactory.create(Vectors.sparse(100, new int[]{1, 3, 4}, new double[]{1.0, 1.0, 1.0})), + RowFactory.create(Vectors.sparse(100, new int[]{1, 3, 4}, new double[]{1.0, 1.0, 1.0})), + RowFactory.create(Vectors.sparse(100, new int[]{3, 4, 6}, new double[]{1.0, 1.0, 1.0})), + RowFactory.create(Vectors.sparse(100, new int[]{2, 8}, new double[]{1.0, 1.0})) + ); + StructType schema = new StructType(new StructField[]{ + new StructField("signatures", new VectorUDT(), false, Metadata.empty()), + }); + Dataset dataset = spark.createDataFrame(data, schema); + + MinHash minHash = new MinHash() + .setInputCol("signatures") + .setOutputCol("buckets") + .setOutputDim(2); + MinHashModel model = minHash.fit(dataset); + + // basic transformation with a new hash column + Dataset transformedDataset = model.transform(dataset); + transformedDataset.select("signatures", "buckets").show(); + + // approximate nearest neighbor search with a dataset and a key + Vector key = Vectors.sparse(100, new int[]{2, 3, 4}, new double[]{1.0, 1.0, 1.0}); + Dataset approxNearestNeighbors = model.approxNearestNeighbors(dataset, key, 3, false, "distance"); + approxNearestNeighbors.select("signatures", "distance").show(); + + // approximate similarity join of two datasets + List dataToJoin = Arrays.asList(RowFactory.create(key)); + Dataset datasetToJoin = spark.createDataFrame(dataToJoin, schema); + Dataset approxSimilarityJoin = model.approxSimilarityJoin(dataset, datasetToJoin, 1); + approxSimilarityJoin.select("datasetA", "datasetB", "distCol").show(); + // $example off$ + + spark.stop(); + } +} diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaRandomProjectionExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaRandomProjectionExample.java new file mode 100644 index 000000000000..a78c1b7ed084 --- /dev/null +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaRandomProjectionExample.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.ml; + +// $example on$ + +import org.apache.spark.ml.feature.RandomProjection; +import org.apache.spark.ml.feature.RandomProjectionModel; +import org.apache.spark.ml.linalg.Vector; +import org.apache.spark.ml.linalg.VectorUDT; +import org.apache.spark.ml.linalg.Vectors; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.RowFactory; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.types.Metadata; +import org.apache.spark.sql.types.StructField; +import org.apache.spark.sql.types.StructType; + +import java.util.Arrays; +import java.util.List; +// $example off$ + +public class JavaRandomProjectionExample { + public static void main(String[] args) { + SparkSession spark = SparkSession + .builder() + .appName("JavaRandomProjectionExample") + .getOrCreate(); + + // $example on$ + List data = Arrays.asList( + RowFactory.create(Vectors.sparse(100, new int[]{1, 3, 4}, new double[]{1.0, 1.0, 1.0})), + RowFactory.create(Vectors.sparse(100, new int[]{1, 3, 4}, new double[]{1.0, 1.0, 1.0})), + RowFactory.create(Vectors.sparse(100, new int[]{3, 4, 6}, new double[]{1.0, 1.0, 1.0})), + RowFactory.create(Vectors.sparse(100, new int[]{2, 8}, new double[]{1.0, 1.0})) + ); + StructType schema = new StructType(new StructField[]{ + new StructField("signatures", new VectorUDT(), false, Metadata.empty()), + }); + Dataset dataset = spark.createDataFrame(data, schema); + + RandomProjection randomProjection = new RandomProjection() + .setInputCol("signatures") + .setOutputCol("results") + .setOutputDim(3) + .setBucketLength(2); + RandomProjectionModel model = randomProjection.fit(dataset); + + // basic transformation with a new hash column + Dataset transformedDataset = model.transform(dataset); + transformedDataset.select("signatures", "results").show(); + + // approximate nearest neighbor search with a dataset and a key + Vector key = Vectors.sparse(100, new int[]{1, 3, 4}, new double[]{1.0, 1.0, 1.0}); + Dataset approxNearestNeighbors = model.approxNearestNeighbors(dataset, key, 3, false, "distance"); + approxNearestNeighbors.select("signatures", "distance").show(); + + // approximate similarity join of two datasets + List dataToJoin = Arrays.asList(RowFactory.create(key)); + Dataset datasetToJoin = spark.createDataFrame(dataToJoin, schema); + Dataset approxSimilarityJoin = model.approxSimilarityJoin(dataset, datasetToJoin, 1); + approxSimilarityJoin.select("datasetA", "datasetB", "distCol").show(); + // $example off$ + + spark.stop(); + } +} diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/MinHashExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/MinHashExample.scala new file mode 100644 index 000000000000..7b8365875680 --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/ml/MinHashExample.scala @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// scalastyle:off println +package org.apache.spark.examples.ml + +// $example on$ +import org.apache.spark.ml.feature.MinHash +import org.apache.spark.ml.linalg.Vectors +// $example off$ +import org.apache.spark.sql.SparkSession + +object MinHashExample { + def main(args: Array[String]) { + val spark = SparkSession + .builder + .appName("MinHashExample") + .getOrCreate() + + // $example on$ + val dataset = spark.createDataFrame(Seq( + (1, Vectors.sparse(100, Array(1, 3, 4), Array(1.0, 1.0, 1.0))), + (2, Vectors.sparse(100, Array(1, 3, 4), Array(1.0, 1.0, 1.0))), + (3, Vectors.sparse(100, Array(3, 4, 6), Array(1.0, 1.0, 1.0))), + (4, Vectors.sparse(100, Array(2, 8), Array(1.0, 1.0))) + )).toDF("id", "signatures") + + val minHash = new MinHash() + .setInputCol("signatures") + .setOutputCol("buckets") + .setOutputDim(2) + val model = minHash.fit(dataset) + + // basic transformation with a new hash column + val transformedDataset = model.transform(dataset) + transformedDataset.select("id", "signatures", "buckets").show + + // approximate nearest neighbor search with a dataset and a key + val key = Vectors.sparse(100, Array[Int](2, 3, 4), Array[Double](1.0, 1.0, 1.0)) + val approxNearestNeighbors = model.approxNearestNeighbors(dataset, key, 3, false, "distance") + approxNearestNeighbors.select("id", "signatures", "distance").show + + // approximate similarity join of two datasets + val datasetToJoin = spark.createDataFrame(Seq((5, key))).toDF("id", "signatures") + val approxSimilarityJoin = model.approxSimilarityJoin(dataset, datasetToJoin, 1) + approxSimilarityJoin.select("datasetA", "datasetB", "distCol").show + // $example off$ + + spark.stop() + } +} diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/RandomProjectionExample.scala b/examples/src/main/scala/org/apache/spark/examples/ml/RandomProjectionExample.scala new file mode 100644 index 000000000000..13568acf599a --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/ml/RandomProjectionExample.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// scalastyle:off println +package org.apache.spark.examples.ml + +// $example on$ +import org.apache.spark.ml.feature.RandomProjection +import org.apache.spark.ml.linalg.Vectors +// $example off$ +import org.apache.spark.sql.SparkSession + +object RandomProjectionExample { + def main(args: Array[String]) { + val spark = SparkSession + .builder + .appName("RandomProjectionExample") + .getOrCreate() + + // $example on$ + val dataset = spark.createDataFrame(Seq( + (1, Vectors.sparse(100, Array(1, 3, 4), Array(1.0, 1.0, 1.0))), + (2, Vectors.sparse(100, Array(1, 3, 4), Array(1.0, 1.0, 1.0))), + (3, Vectors.sparse(100, Array(3, 4, 6), Array(1.0, 1.0, 1.0))), + (4, Vectors.sparse(100, Array(2, 8), Array(1.0, 1.0))) + )).toDF("id", "signatures") + + val randomProjection = new RandomProjection() + .setInputCol("signatures") + .setOutputCol("results") + .setOutputDim(3) + .setBucketLength(2); + val model = randomProjection.fit(dataset) + + // basic transformation with a new hash column + val transformedDataset = model.transform(dataset) + transformedDataset.select("id", "signatures", "results").show + + // approximate nearest neighbor search with a dataset and a key + val key = Vectors.sparse(100, Array[Int](1, 3, 4), Array[Double](1.0, 1.0, 1.0)) + val approxNearestNeighbors = model.approxNearestNeighbors(dataset, key, 3, false, "distance") + approxNearestNeighbors.select("id", "signatures", "distance").show + + // approximate similarity join of two datasets + val datasetToJoin = spark.createDataFrame(Seq((5, key))).toDF("id", "signatures") + val approxSimilarityJoin = model.approxSimilarityJoin(dataset, datasetToJoin, 1) + approxSimilarityJoin.select("datasetA", "datasetB", "distCol").show + // $example off$ + + spark.stop() + } +} From 78593e529c768aa05147e66d0d8a9b96d9c1f592 Mon Sep 17 00:00:00 2001 From: bravo-zhang Date: Mon, 7 Nov 2016 08:04:52 -0800 Subject: [PATCH 2/2] Remove Python from commit message --- .../java/org/apache/spark/examples/ml/JavaMinHashExample.java | 1 - .../apache/spark/examples/ml/JavaRandomProjectionExample.java | 1 - 2 files changed, 2 deletions(-) diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaMinHashExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaMinHashExample.java index 72d8549249c3..09b030dd50fe 100644 --- a/examples/src/main/java/org/apache/spark/examples/ml/JavaMinHashExample.java +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaMinHashExample.java @@ -18,7 +18,6 @@ package org.apache.spark.examples.ml; // $example on$ - import org.apache.spark.ml.feature.MinHash; import org.apache.spark.ml.feature.MinHashModel; import org.apache.spark.ml.linalg.Vector; diff --git a/examples/src/main/java/org/apache/spark/examples/ml/JavaRandomProjectionExample.java b/examples/src/main/java/org/apache/spark/examples/ml/JavaRandomProjectionExample.java index a78c1b7ed084..444ed18d42d9 100644 --- a/examples/src/main/java/org/apache/spark/examples/ml/JavaRandomProjectionExample.java +++ b/examples/src/main/java/org/apache/spark/examples/ml/JavaRandomProjectionExample.java @@ -18,7 +18,6 @@ package org.apache.spark.examples.ml; // $example on$ - import org.apache.spark.ml.feature.RandomProjection; import org.apache.spark.ml.feature.RandomProjectionModel; import org.apache.spark.ml.linalg.Vector;