diff --git a/docs/mllib-ann.md b/docs/mllib-ann.md new file mode 100644 index 0000000000000..dfbe173ffbacb --- /dev/null +++ b/docs/mllib-ann.md @@ -0,0 +1,239 @@ +--- +layout: global +title: Artificial Neural Networks - MLlib +displayTitle: MLlib - Artificial Neural Networks +--- + +# Introduction + +This document describes the MLlib's Artificial Neural Network (ANN) implementation. + +The implementation currently consist of the following files: + +* 'ArtificialNeuralNetwork.scala': implements the ANN +* 'ANNSuite': implements automated tests for the ANN and its gradient +* 'ANNDemo': a demo that approximates three functions and shows a graphical representation of +the result + +# Summary of usage + +The "ArtificialNeuralNetwork" object is used as an interface to the neural network. It is +called as follows: + +``` +val annModel = ArtificialNeuralNetwork.train(rdd, hiddenLayersTopology, maxNumIterations) +``` + +where + +* `rdd` is an RDD of type (Vector,Vector), the first element containing the input vector and +the second the associated output vector. +* `hiddenLayersTopology` is an array of integers (Array[Int]), which contains the number of +nodes per hidden layer, starting with the layer that takes inputs from the input layer, and +finishing with the layer that outputs to the output layer. The bias nodes are not counted. +* `maxNumIterations` is an upper bound to the number of iterations to be performed. +* `ANNmodel` contains the trained ANN parameters, and can be used to calculated the ANNs +approximation to arbitrary input values. + +The approximations can be calculated as follows: + +``` +val v_out = annModel.predict(v_in) +``` + +where v_in is either a Vector or an RDD of Vectors, and v_out respectively a Vector or RDD of +(Vector,Vector) pairs, corresponding to input and output values. + +Further details and other calling options will be elaborated upon below. + +# Architecture and Notation + +The file ArtificialNeuralNetwork.scala implements the ANN. The following picture shows the +architecture of a 3-layer ANN: + +``` + +-------+ + | | + | N_0,0 | + | | + +-------+ +-------+ + | | + +-------+ | N_0,1 | +-------+ + | | | | | | + | N_1,0 |- +-------+ ->| N_0,2 | + | | \ Wij1 / | | + +-------+ -- +-------+ -- +-------+ + \ | | / Wjk2 + : ->| N_1,1 |- +-------+ + : | | | | + : +-------+ | N_1,2 | + : | | + : : +-------+ + : : + : : : + : : + : : +-------+ + : : | | + : : |N_K-1,2| + : | | + : +-------+ +-------+ + : | | + : |N_J-1,1| + | | + +-------+ +-------+ + | | + |N_I-1,0| + | | + +-------+ + + +-------+ +--------+ + | | | | + | -1 | | -1 | + | | | | + +-------+ +--------+ + +INPUT LAYER HIDDEN LAYER OUTPUT LAYER +``` + +The i-th node in layer l is denoted by N_{i,l}, both i and l starting with 0. The weight +between node i in layer l-1 and node j in layer l is denoted by Wijl. Layer 0 is the input +layer, whereas layer L is the output layer. + +The ANN also implements bias units. These are nodes that always output the value -1. The bias +units are in all layers except the output layer. They act similar to other nodes, but do not +have input. + +The value of node N_{j,l} is calculated as follows: + +`$N_{j,l} = g( \sum_{i=0}^{topology_l} W_{i,j,l)*N_{i,l-1} )$` + +Where g is the sigmoid function + +`$g(t) = \frac{e^{\beta t} }{1+e^{\beta t}}$` + +# LBFGS + +MLlib's ANN implementation uses the LBFGS optimisation algorithm for training. It minimises the +following error function: + +`$E = \sum_{k=0}^{K-1} (N_{k,L} - Y_k)^2$` + +where Y_k is the target output given inputs N_{0,0} ... N_{I-1,0}. + +# Implementation Details + +## The "ArtificialNeuralNetwork" class + +The "ArtificialNeuralNetwork" class has the following constructor: + +``` +class ArtificialNeuralNetwork private(topology: Array[Int], maxNumIterations: Int, +convergenceTol: Double) +``` + +* `topology` is an array of integers indicating then number of nodes per layer. For example, if +"topology" holds (3, 5, 1), it means that there are three input nodes, five nodes in a single +hidden layer and 1 output node. +* `maxNumIterations` indicates the number of iterations after which the LBFGS algorithm must +have stopped. +* `convergenceTol` indicates the acceptable error, and if reached the LBFGS algorithm will +stop. A lower value of "convergenceTol" will give a higher precision. + +## The "ArtificialNeuralNetwork" object + +The object "ArtificialNeuralNetwork" is the interface to the "ArtificialNeuralNetwork" class. +The object contains the training function. There are six different instances of the training +function, each for use with different parameters. All take as the first parameter the RDD +"input", which contains pairs of input and output vectors. + +In addition, there are three functions for generating random weights. Two take a fixed seed, +which is useful for testing if one wants to start with the same weights in every test. + +* `def train(trainingRDD: RDD[(Vector, Vector)], hiddenLayersTopology: Array[Int], +maxNumIterations: Int): ArtificialNeuralNetworkModel`: starts training with random initial +weights, and a default convergenceTol=1e-4. +* `def train(trainingRDD: RDD[(Vector, Vector)], model: ArtificialNeuralNetworkModel, +maxNumIterations: Int): ArtificialNeuralNetworkModel`: resumes training given an earlier +calculated model, and a default convergenceTol=1e-4. +* `def train(trainingRDD: RDD[(Vector,Vector)], hiddenLayersTopology: Array[Int], +initialWeights: Vector, maxNumIterations: Int): ArtificialNeuralNetworkModel`: Trains an ANN +with given initial weights, and a default convergenceTol=1e-4. +* `def train(trainingRDD: RDD[(Vector, Vector)], hiddenLayersTopology: Array[Int], +maxNumIterations: Int, convergenceTol: Double): ArtificialNeuralNetworkModel`: starts training +with random initial weights. Allows setting a customised "convergenceTol". +* `def train(trainingRDD: RDD[(Vector, Vector)], model: ArtificialNeuralNetworkModel, +maxNumIterations: Int, convergenceTol: Double): ArtificialNeuralNetworkModel`: resumes training +given an earlier calculated model. Allows setting a customised "convergenceTol". +* `def train(trainingRDD: RDD[(Vector,Vector)], hiddenLayersTopology: Array[Int], +initialWeights: Vector, maxNumIterations: Int, convergenceTol: Double): +ArtificialNeuralNetworkModel`: Trains an ANN with given initial weights. Allows setting a +customised "convergenceTol". +* `def randomWeights(trainingRDD: RDD[(Vector,Vector)], hiddenLayersTopology: Array[Int]): +Vector`: Generates a random weights vector. +*`def randomWeights(trainingRDD: RDD[(Vector,Vector)], hiddenLayersTopology: Array[Int], +seed: Int): Vector`: Generates a random weights vector with given seed. +*`def randomWeights(inputLayerSize: Int, outputLayerSize: Int, hiddenLayersTopology: Array[Int], +seed: Int): Vector`: Generates a random weights vector, using given random seed, input layer +size, hidden layers topology and output layer size. + +Notice that the "hiddenLayersTopology" differs from the "topology" array. The +"hiddenLayersTopology" does not include the number of nodes in the input and output layers. The +number of nodes in input and output layers is calculated from the first element of the training +RDD. For example, the "topology" array (3, 5, 7, 1) would have a "hiddenLayersTopology" (5, 7), +the values 3 and 1 are deduced from the training data. The rationale for having these different +arrays is that future methods may have a different mapping between input values and input nodes +or output values and output nodes. + +## The "ArtificialNeuralNetworkModel" class + +All training functions return the trained ANN using the class "ArtificialNeuralNetworkModel". +This class has the following function: + +* `predict(testData: Vector): Vector` calculates the output vector given input vector +"testData". +* `predict(testData: RDD[Vector]): RDD[(Vector,Vector)]` returns (input, output) vector pairs, +using input vector pairs in "testData". + +The weights used by "predict" come from the model. + +## Training + +We have chosen to implement the ANN with LBFGS as optimiser function. We compared it with +Stochastic Gradient Descent. LBGFS was much faster, but in accordance is also earlier with +overfitting. + +Science has provided many different strategies to train an ANN. Hence it is important that the +optimising functions in MLlib's ANN are interchangeable. A new optimisation strategy can be +implemented by creating a new class descending from ArtificialNeuralNetwork, and replacing the +optimiser, updater and possibly gradient as required. + +# Demo and tests + +Usage of MLlib's ANN is demonstrated through the "ANNDemo" demo program. The program generates +three functions: + +* f2d: x -> y +* f3d: (x,y) -> z +* f4d: t -> (x,y,z) + +It will calculate approximations of the target functions, and show a graphical representation +of the training set and the results after applying the testing set. + +In addition, there are the following automated tests: + +* "ANN learns XOR function": tests that the ANN can properly approximate an XOR function. +* "Gradient of ANN": tests that the output of the ANN gradient is roughly equal to an +approximated gradient. + +# Conclusion + +The "ArtificialNeuralNetwork" class implements a Artificial Neural Network (ANN), using the +LBFGS algorithm. It takes as input an RDD of input/output values of type "(Vector,Vector)", and +returns an object of type "ArtificialNeuralNetworkModel" containing the parameters of the +trained ANN. The "ArtificialNeuralNetworkModel" object can also be used to calculate results +after training. + +The training of an ANN can be interrupted and later continued, allowing intermediate inspection +of the results. + +A demo program and tests for ANN are provided. diff --git a/examples/src/main/scala/org/apache/spark/examples/ANNDemo.scala b/examples/src/main/scala/org/apache/spark/examples/ANNDemo.scala new file mode 100644 index 0000000000000..dd981f90e9cff --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/ANNDemo.scala @@ -0,0 +1,578 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.mllib + +import java.awt._ +import java.awt.event._ +import java.text.SimpleDateFormat +import java.util.Calendar + +import org.apache.spark._ +import org.apache.spark.mllib.ann._ +import org.apache.spark.mllib.linalg._ +import org.apache.spark.mllib.regression._ +import org.apache.spark.rdd.RDD + +import scala.Array.canBuildFrom +import scala.util.Random + +object windowAdapter extends WindowAdapter { + + override def windowClosing(e: WindowEvent) { + System.exit(0) + } + +} + +class OutputCanvas2D(wd: Int, ht: Int) extends Canvas { + + var points: Array[Vector] = null + var approxPoints: Array[Vector] = null + + /* input: rdd of (x,y) vectors */ + def setData(rdd: RDD[Vector]) { + points = rdd.collect + repaint + } + + def setApproxPoints(rdd: RDD[Vector]) { + approxPoints = rdd.collect + repaint + } + + def plotDot(g: Graphics, x: Int, y: Int) { + val r = 5 + val noSamp = 6*r + var x1 = x + var y1 = y + r + for(j <- 1 to noSamp) { + val x2 = (x.toDouble + math.sin(j.toDouble*2*math.Pi/noSamp)*r + .5).toInt + val y2 = (y.toDouble + math.cos(j.toDouble*2*math.Pi/noSamp)*r + .5).toInt + g.drawLine(x1, ht - y1, x2, ht - y2) + x1 = x2 + y1 = y2 + } + } + + override def paint(g: Graphics) = { + + var xmax: Double = 0.0 + var xmin: Double = 0.0 + var ymax: Double = 0.0 + var ymin: Double = 0.0 + + if(points!=null) { + + g.setColor(Color.black) + val x = points.map(T => (T.toArray)(0)) + val y = points.map(T => (T.toArray)(1)) + + xmax = x.max + xmin = x.min + ymax = y.max + ymin = y.min + + for(i <- 0 to x.size - 1) { + + val xr = (((x(i).toDouble - xmin)/(xmax - xmin))*wd + .5).toInt + val yr = (((y(i).toDouble - ymin)/(ymax - ymin))*ht + .5).toInt + plotDot(g, xr, yr) + + } + + if(approxPoints != null) { + + g.setColor(Color.red) + val x = approxPoints.map(T => (T.toArray)(0)) + val y = approxPoints.map(T => (T.toArray)(1)) + + for(i <- 0 to x.size-1) { + val xr = (((x(i).toDouble - xmin)/(xmax - xmin))*wd + .5).toInt + val yr = (((y(i).toDouble - ymin)/(ymax - ymin))*ht + .5).toInt + plotDot(g, xr, yr) + } + + } + + } + + } + +} + +class OutputFrame2D( title: String ) extends Frame( title ) { + + val wd = 800 + val ht = 600 + + var outputCanvas = new OutputCanvas2D( wd, ht ) + + def apply() { + addWindowListener(windowAdapter) + setSize(wd, ht) + add("Center", outputCanvas) + show() + } + + def setData(rdd: RDD[Vector]) { + outputCanvas.setData(rdd) + } + + def setApproxPoints(rdd: RDD[Vector]) { + outputCanvas.setApproxPoints(rdd) + } + + +} + +object windowAdapter3D extends WindowAdapter { + + override def windowClosing(e: WindowEvent) { + System.exit(0) + } + +} + +class OutputCanvas3D(wd: Int, ht: Int, shadowFrac: Double) extends Canvas { + + var points: Array[Vector] = null + var approxPoints: Array[Vector] = null + var angle: Double = 0.0 + + /* 3 dimensional (x,y,z) vector */ + def setData(rdd: RDD[Vector]) { + points = rdd.collect + repaint + } + + def setAngle(angle: Double) { + this.angle = angle + repaint + } + + + def setApproxPoints(rdd: RDD[Vector]) { + approxPoints = rdd.collect + repaint + } + + def plotDot(g: Graphics, x: Int, y: Int) { + val r = 5 + val noSamp = 6*r + var x1 = x + var y1 = y + r + for( j <- 1 to noSamp ) { + val x2 = (x.toDouble + math.sin( j.toDouble*2*math.Pi/noSamp )*r + .5).toInt + val y2 = (y.toDouble + math.cos( j.toDouble*2*math.Pi/noSamp )*r + .5).toInt + g.drawLine(x1, ht - y1, x2, ht - y2) + x1 = x2 + y1 = y2 + } + } + + def plotLine(g: Graphics, x1: Int, y1: Int, x2: Int, y2: Int) { + g.drawLine(x1, ht - y1, x2, ht - y2) + } + + def calcCord(arr: Array[Double], angle: Double): + (Double, Double, Double, Double, Double, Double) = { + + var arrOut = new Array[Double](6) + + val x = arr(0)*math.cos(angle) - arr(1)*math.sin(angle) + val y = arr(0)*math.sin(angle) + arr(1)*math.cos(angle) + val z = arr(2) + + val x0 = arr(0)*math.cos(angle) - arr(1)*math.sin(angle) + val y0 = arr(0)*math.sin(angle) + arr(1)*math.cos(angle) + val z0 = 0 + + val xs = (arr(0) + shadowFrac*arr(2))*math.cos(angle) - arr(1)*math.sin(angle) + val ys = (arr(0) + shadowFrac*arr(2))*math.sin(angle) + arr(1)*math.cos(angle) + val zs = 0 + + arrOut(0) = y - .5*x + arrOut(1) = z - .25*x + + arrOut(2) = y0 - .5*x0 + arrOut(3) = z0 - .25*x0 + + arrOut(4) = ys - .5*xs + arrOut(5) = zs - .25*xs + + (arrOut(0), arrOut(1), arrOut(2), arrOut(3), arrOut(4), arrOut(5)) + + } + + override def paint(g: Graphics) = { + + if(points!=null) { + + var p = points.map(T => calcCord(T.toArray, angle)).toArray + + var xmax = p(0)._1 + var xmin = p(0)._1 + var ymax = p(0)._2 + var ymin = p(0)._2 + + for(i <- 0 to p.size-1) { + + if(xmaxp(i)._1) { + xmin = p(i)._1 + } + if(xmin>p(i)._3) { + xmin = p(i)._3 + } + if(xmin>p(i)._5) { + xmin = p(i)._5 + } + + if(ymaxp(i)._2) { + ymin = p(i)._2 + } + if(ymin>p(i)._4) { + ymin = p(i)._4 + } + if(ymin>p(i)._6) { + ymin = p(i)._6 + } + + } + + for(i <- 0 to p.size-1) { + + var x_ = (((p(i)._1 - xmin)/(xmax - xmin))*(wd - 40) + 20.5).toInt + var y_ = (((p(i)._2 - ymin)/(ymax - ymin))*(ht - 40) + 20.5).toInt + var x0 = (((p(i)._3 - xmin)/(xmax - xmin))*(wd - 40) + 20.5).toInt + var y0 = (((p(i)._4 - ymin)/(ymax - ymin))*(ht - 40) + 20.5).toInt + var xs = (((p(i)._5 - xmin)/(xmax - xmin))*(wd - 40) + 20.5).toInt + var ys = (((p(i)._6 - ymin)/(ymax - ymin))*(ht - 40) + 20.5).toInt + + g.setColor(Color.black) + plotDot(g, x_, y_) + plotLine(g, x_, y_, x0, y0) + g.setColor(Color.gray) + plotLine(g, x0, y0, xs, ys) + + } + + if(approxPoints != null) { + + var p = approxPoints.map(T => calcCord(T.toArray, angle)) + + for(i <- 0 to p.size-1) { + + var x_ = (((p(i)._1 - xmin)/(xmax - xmin))*(wd - 40) + 20.5).toInt + var y_ = (((p(i)._2 - ymin)/(ymax - ymin))*(ht - 40) + 20.5).toInt + var x0 = (((p(i)._3 - xmin)/(xmax - xmin))*(wd - 40) + 20.5).toInt + var y0 = (((p(i)._4 - ymin)/(ymax - ymin))*(ht - 40) + 20.5).toInt + var xs = (((p(i)._5 - xmin)/(xmax - xmin))*(wd - 40) + 20.5).toInt + var ys = (((p(i)._6 - ymin)/(ymax - ymin))*(ht - 40) + 20.5).toInt + + g.setColor(Color.red) + plotDot(g, x_, y_) + plotLine(g, x_, y_, x0, y0) + g.setColor(Color.magenta) + plotLine(g, x0, y0, xs, ys) + + } + + } + + } + } +} + +class OutputFrame3D(title: String, shadowFrac: Double) extends Frame(title) { + + val wd = 800 + val ht = 600 + + def this(title: String) = this(title, .25) + + var outputCanvas = new OutputCanvas3D(wd, ht, shadowFrac) + + def apply() { + addWindowListener(windowAdapter3D) + setSize(wd, ht) + add("Center", outputCanvas) + show() + } + + def setData(rdd: RDD[Vector]) { + outputCanvas.setData(rdd) + } + + def setAngle(angle: Double) { + outputCanvas.setAngle(angle) + } + + def setApproxPoints(rdd: RDD[Vector]) { + outputCanvas.setApproxPoints(rdd) + } + +} + +object ANNDemo { + + var rand = new Random(0) + + def generateInput2D(f: Double => Double, xmin: Double, xmax: Double, noPoints: Int): + Array[(Vector,Vector)] = + { + + var out = new Array[(Vector,Vector)](noPoints) + + for(i <- 0 to noPoints - 1) { + val x = xmin + rand.nextDouble()*(xmax - xmin) + val y = f(x) + out(i) = (Vectors.dense(x), Vectors.dense(y)) + } + + return out + + } + + + def generateInput3D(f: (Double,Double) => Double, xmin: Double, xmax: Double, + ymin: Double, ymax: Double, noPoints: Int): Array[(Vector,Vector)] = { + + var out = new Array[(Vector,Vector)](noPoints) + + for(i <- 0 to noPoints - 1) { + + val x = xmin + rand.nextDouble()*(xmax - xmin) + val y = ymin + rand.nextDouble()*(ymax - ymin) + val z = f(x, y) + + var arr = new Array[Double](2) + + arr(0) = x + arr(1) = y + out(i) = (Vectors.dense(arr), Vectors.dense(z)) + + } + + out + + } + + def generateInput4D(f: Double => (Double,Double,Double), + tmin: Double, tmax: Double, noPoints: Int): Array[(Vector,Vector)] = { + + var out = new Array[(Vector,Vector)](noPoints) + + for(i <- 0 to noPoints - 1) { + + val t: Double = tmin + rand.nextDouble()*(tmax - tmin) + var arr = new Array[Double](3) + var F = f(t) + + arr(0) = F._1 + arr(1) = F._2 + arr(2) = F._3 + + out(i) = (Vectors.dense(t), Vectors.dense(arr)) + } + + out + + } + + def f( T: Double ): Double = { + val y = 0.5 + Math.abs(T/5).toInt.toDouble*.15 + math.sin(T*math.Pi/10)*.1 + assert(y <= 1) + y + } + + def f3D(x: Double, y: Double): Double = { + .5 + .24*Math.sin(x*2*math.Pi/10) + .24*Math.cos(y*2*math.Pi/10) + } + + def f4D(t: Double): (Double, Double,Double) = { + val x = Math.abs(.8*Math.cos(t*2*math.Pi/20)) + .1 + val y = (11 + t)/22 + val z = .5 + .35*Math.sin(t*2*math.Pi/5)*Math.cos( t*2*math.Pi/10 ) + .15*t/11 + (x, y, z) + } + + def concat(v1: Vector, v2: Vector): Vector = { + + var a1 = v1.toArray + var a2 = v2.toArray + var a3 = new Array[Double](a1.size + a2.size) + + for(i <- 0 to a1.size - 1) { + a3(i) = a1(i) + } + + for(i <- 0 to a2.size - 1) { + a3(i + a1.size) = a2(i) + } + + Vectors.dense(a3) + + } + + def main(arg: Array[String]) { + + println("ANN demo") + println + + val formatter = new SimpleDateFormat("hh:mm:ss") + + var curAngle: Double = 0.0 + + var outputFrame2D: OutputFrame2D = null + var outputFrame3D: OutputFrame3D = null + var outputFrame4D: OutputFrame3D = null + + outputFrame2D = new OutputFrame2D("x -> y") + outputFrame2D.apply + + outputFrame3D = new OutputFrame3D("(x,y) -> z", 1) + outputFrame3D.apply + + outputFrame4D = new OutputFrame3D("t -> (x,y,z)") + outputFrame4D.apply + + var A = 20.0 + var B = 50.0 + + var conf = new SparkConf().setAppName("Parallel ANN").setMaster("local[1]") + var sc = new SparkContext(conf) + + val testRDD2D = + sc.parallelize(generateInput2D( T => f(T), -10, 10, 100 ), 2).cache + val testRDD3D = + sc.parallelize(generateInput3D((x,y) => f3D(x,y), -10, 10, -10, 10, 200 ), 2).cache + val testRDD4D = + sc.parallelize( generateInput4D( t => f4D(t), -10, 10, 100 ), 2 ).cache + + val validationRDD2D = + sc.parallelize(generateInput2D( T => f(T), -10, 10, 100 ), 2).cache + val validationRDD3D = + sc.parallelize(generateInput3D( (x,y) => f3D(x,y), -10, 10, -10, 10, 100 ), 2).cache + val validationRDD4D = + sc.parallelize( generateInput4D( t => f4D(t), -10, 10, 100 ), 2 ).cache + + outputFrame2D.setData( testRDD2D.map( T => concat( T._1, T._2 ) ) ) + outputFrame3D.setData( testRDD3D.map( T => concat( T._1, T._2 ) ) ) + outputFrame4D.setData( testRDD4D.map( T => T._2 ) ) + + var starttime = Calendar.getInstance().getTime() + println("Training 2D") + var model2D = ArtificialNeuralNetwork.train(testRDD2D, Array[Int](5, 3), 1000, 1e-8) + var stoptime = Calendar.getInstance().getTime() + println(((stoptime.getTime-starttime.getTime + 500) / 1000) + "s") + + starttime = stoptime + println("Training 3D") + var model3D = ArtificialNeuralNetwork.train(testRDD3D, Array[Int](20), 1000, 1e-8) + stoptime = Calendar.getInstance().getTime() + println(((stoptime.getTime-starttime.getTime + 500) / 1000) + "s") + + starttime = stoptime + println("Training 4D") + var model4D = ArtificialNeuralNetwork.train(testRDD4D, Array[Int](20), 1000, 1e-8) + stoptime = Calendar.getInstance().getTime() + println(((stoptime.getTime-starttime.getTime + 500) / 1000) + "s") + + val predictedAndTarget2D = validationRDD2D.map(T => (T._1, T._2, model2D.predict(T._1))) + val predictedAndTarget3D = validationRDD3D.map(T => (T._1, T._2, model3D.predict(T._1))) + val predictedAndTarget4D = validationRDD4D.map(T => (T._1, T._2, model4D.predict(T._1))) + + var err2D = predictedAndTarget2D.map( T => + (T._3.toArray(0) - T._2.toArray(0))*(T._3.toArray(0) - T._2.toArray(0)) + ).reduce((u,v) => u + v) + + var err3D = predictedAndTarget3D.map( T => + (T._3.toArray(0) - T._2.toArray(0))*(T._3.toArray(0) - T._2.toArray(0)) + ).reduce((u,v) => u + v) + + var err4D = predictedAndTarget4D.map(T => { + + val v1 = T._2.toArray + val v2 = T._3.toArray + + (v1(0) - v2(0)) * (v1(0) - v2(0)) + + (v1(1) - v2(1)) * (v1(1) - v2(1)) + + (v1(2) - v2(2)) * (v1(2) - v2(2)) + + }).reduce((u,v) => u + v) + + println("Error 2D/3D/4D: " + (err2D, err3D, err4D)) + + val predicted2D = predictedAndTarget2D.map( + T => concat(T._1, T._3) + ) + + val predicted3D = predictedAndTarget3D.map( + T => concat(T._1, T._3) + ) + + val predicted4D = predictedAndTarget4D.map( + T => T._3 + ) + + outputFrame2D.setApproxPoints(predicted2D) + outputFrame3D.setApproxPoints(predicted3D) + outputFrame4D.setApproxPoints(predicted4D) + + while(true) { // stops when closing the window + + curAngle = curAngle + math.Pi/8 + if(curAngle >= 2*math.Pi) { + curAngle = curAngle - 2*math.Pi + } + + outputFrame3D.setAngle(curAngle) + outputFrame4D.setAngle(curAngle) + + outputFrame3D.repaint + outputFrame4D.repaint + + Thread.sleep(3000) + + } + + sc.stop + + } + +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/ann/ArtificialNeuralNetwork.scala b/mllib/src/main/scala/org/apache/spark/mllib/ann/ArtificialNeuralNetwork.scala new file mode 100644 index 0000000000000..231597d8c1997 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/ann/ArtificialNeuralNetwork.scala @@ -0,0 +1,551 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.ann + +import breeze.linalg.{axpy => brzAxpy, Vector => BV, DenseVector => BDV, +DenseMatrix => BDM, sum => Bsum, argmax => Bargmax, norm => Bnorm, *} +import breeze.numerics.{sigmoid => Bsigmoid} +import org.apache.spark.annotation.Experimental +import org.apache.spark.mllib.linalg + +import org.apache.spark.mllib.linalg.{DenseMatrix, DenseVector, Vector, Vectors} +import org.apache.spark.mllib.optimization._ +import org.apache.spark.rdd.RDD +import org.apache.spark.util.random.XORShiftRandom + +/* + * Implements a Artificial Neural Network (ANN) + * + * The data consists of an input vector and an output vector, combined into a single vector + * as follows: + * + * [ ---input--- ---output--- ] + * + * NOTE: output values should be in the range [0,1] + * + * For a network of H hidden layers: + * + * hiddenLayersTopology(h) indicates the number of nodes in hidden layer h, excluding the bias + * node. h counts from 0 (first hidden layer, taking inputs from input layer) to H - 1 (last + * hidden layer, sending outputs to the output layer). + * + * hiddenLayersTopology is converted internally to topology, which adds the number of nodes + * in the input and output layers. + * + * noInput = topology(0), the number of input nodes + * noOutput = topology(L-1), the number of output nodes + * + * input = data( 0 to noInput-1 ) + * output = data( noInput to noInput + noOutput - 1 ) + * + * W_ijl is the weight from node i in layer l-1 to node j in layer l + * W_ijl goes to position ofsWeight(l) + j*(topology(l-1)+1) + i in the weights vector + * + * B_jl is the bias input of node j in layer l + * B_jl goes to position ofsWeight(l) + j*(topology(l-1)+1) + topology(l-1) in the weights vector + * + * error function: E( O, Y ) = sum( O_j - Y_j ) + * (with O = (O_0, ..., O_(noOutput-1)) the output of the ANN, + * and (Y_0, ..., Y_(noOutput-1)) the input) + * + * node_jl is node j in layer l + * node_jl goes to position ofsNode(l) + j + * + * The weights gradient is defined as dE/dW_ijl and dE/dB_jl + * It has same mapping as W_ijl and B_jl + * + * For back propagation: + * delta_jl = dE/dS_jl, where S_jl the output of node_jl, but before applying the sigmoid + * delta_jl has the same mapping as node_jl + * + * Where E = ((estOutput-output),(estOutput-output)), + * the inner product of the difference between estimation and target output with itself. + * + */ + +/** + * Artificial neural network (ANN) model + * + * @param weights the weights between the neurons in the ANN. + * @param topology array containing the number of nodes per layer in the network, including + * the nodes in the input and output layer, but excluding the bias nodes. + */ +class ArtificialNeuralNetworkModel private[mllib](val weights: Vector, val topology: Array[Int]) + extends Serializable with NeuralHelper { + + val (weightMatrices, bias) = unrollWeights(weights) + + /** + * Predicts values for a single data point using the trained model. + * + * @param testData represents a single data point. + * @return prediction using the trained model. + */ + def predict(testData: Vector): Vector = { + Vectors.dense(computeValues(testData, topology.length - 1)) + } + + /** + * Predict values for an RDD of data points using the trained model. + * + * @param testDataRDD RDD representing the input vectors. + * @return RDD with predictions using the trained model as (input, output) pairs. + */ + def predict(testDataRDD: RDD[Vector]): RDD[(Vector,Vector)] = { + testDataRDD.map(T => (T, predict(T)) ) + } + + private def computeValues(testData: Vector, layer: Int): Array[Double] = { + require(layer >=0 && layer < topology.length) + /* TODO: BDM */ + val outputs = forwardRun(testData.toBreeze.toDenseVector.toDenseMatrix.t, weightMatrices, bias) + outputs(layer).toArray + } + + /** + * Returns output values of a given layer for a single data point using the trained model. + * + * @param testData RDD represents a single data point. + * @param layer index of a network layer + * @return output of a given layer. + */ + def output(testData: Vector, layer: Int): Vector = { + Vectors.dense(computeValues(testData, layer)) + } + + /** + * Returns weights for a given layer in vector form. + * + * @param index index of a layer: ranges from 1 until topology.length. + * (no weights for the 0 layer) + * @return weights. + */ + def weightsByLayer(index: Int): Vector = { + require(index > 0 && index < topology.length) + val layerWeight = BDV.vertcat(weightMatrices(index).toDenseVector, bias(index).toDenseVector) + Vectors.dense(layerWeight.toArray) + } +} + +/** + * Performs the training of an Artificial Neural Network (ANN) + * + * @param topology A vector containing the number of nodes per layer in the network, including + * the nodes in the input and output layer, but excluding the bias nodes. + * @param maxNumIterations The maximum number of iterations for the training phase. + * @param convergenceTol Convergence tolerance for LBFGS. Smaller value for closer convergence. + */ +class ArtificialNeuralNetwork private[mllib]( + topology: Array[Int], + maxNumIterations: Int, + convergenceTol: Double, + batchSize: Int = 1) + extends Serializable { + + private val gradient = new ANNLeastSquaresGradient(topology, batchSize) + private val updater = new ANNUpdater() + private val optimizer = new LBFGS(gradient, updater). + setConvergenceTol(convergenceTol). + setNumIterations(maxNumIterations) + + /** + * Trains the ANN model. + * Uses default convergence tolerance 1e-4 for LBFGS. + * + * @param trainingRDD RDD containing (input, output) pairs for training. + * @param initialWeights the initial weights of the ANN + * @return ANN model. + */ + private def run(trainingRDD: RDD[(Vector, Vector)], initialWeights: Vector): + ArtificialNeuralNetworkModel = { + val t = System.currentTimeMillis() + val data = if (batchSize == 1) { + trainingRDD.map(v => + (0.0, + Vectors.fromBreeze(BDV.vertcat( + v._1.toBreeze.toDenseVector, + v._2.toBreeze.toDenseVector)) + )) + } else { trainingRDD.mapPartitions { it => + it.grouped(batchSize).map { seq => + val size = seq.size + val bigVector = new Array[Double](topology(0) * size + topology.last * size) + var i = 0 + seq.foreach { case (in, out) => + System.arraycopy(in.toArray, 0, bigVector, i * topology(0), topology(0)) + System.arraycopy(out.toArray, 0, bigVector, + topology(0) * size + i * topology.last, topology.last) + i += 1 + } + (0.0, Vectors.dense(bigVector)) + } + } + } + val weights = optimizer.optimize(data, initialWeights) + new ArtificialNeuralNetworkModel(weights, topology) + } +} + +/** + * Top level methods for training the artificial neural network (ANN) + */ +object ArtificialNeuralNetwork { + + private val defaultTolerance: Double = 1e-4 + + + def train(trainingRDD: RDD[(Vector, Vector)], + batchSize: Int, + hiddenLayersTopology: Array[Int], + initialWeights: Vector, + maxNumIterations: Int, + convergenceTol: Double) : ArtificialNeuralNetworkModel = { + val topology = convertTopology(trainingRDD, hiddenLayersTopology) + new ArtificialNeuralNetwork(topology, maxNumIterations, convergenceTol, batchSize). + run(trainingRDD, initialWeights) + } + + def train(trainingRDD: RDD[(Vector, Vector)], + batchSize: Int, + hiddenLayersTopology: Array[Int], + maxNumIterations: Int) : ArtificialNeuralNetworkModel = { + val topology = convertTopology(trainingRDD, hiddenLayersTopology) + new ArtificialNeuralNetwork(topology, maxNumIterations, defaultTolerance, batchSize). + run(trainingRDD, randomWeights(topology, false)) + } + + /** + * Trains an ANN. + * Uses default convergence tolerance 1e-4 for LBFGS. + * + * @param trainingRDD RDD containing (input, output) pairs for training. + * @param hiddenLayersTopology number of nodes per hidden layer, excluding the bias nodes. + * @param maxNumIterations specifies maximum number of training iterations. + * @return ANN model. + */ + def train(trainingRDD: RDD[(Vector, Vector)], + hiddenLayersTopology: Array[Int], + maxNumIterations: Int): ArtificialNeuralNetworkModel = { + train(trainingRDD, hiddenLayersTopology, maxNumIterations, defaultTolerance) + } + + /** + * Continues training of an ANN. + * Uses default convergence tolerance 1e-4 for LBFGS. + * + * @param trainingRDD RDD containing (input, output) pairs for training. + * @param model model of an already partly trained ANN. + * @param maxNumIterations maximum number of training iterations. + * @return ANN model. + */ + def train(trainingRDD: RDD[(Vector,Vector)], + model: ArtificialNeuralNetworkModel, + maxNumIterations: Int): ArtificialNeuralNetworkModel = { + train(trainingRDD, model, maxNumIterations, defaultTolerance) + } + + /** + * Trains an ANN with given initial weights. + * Uses default convergence tolerance 1e-4 for LBFGS. + * + * @param trainingRDD RDD containing (input, output) pairs for training. + * @param initialWeights initial weights vector. + * @param maxNumIterations maximum number of training iterations. + * @return ANN model. + */ + def train(trainingRDD: RDD[(Vector,Vector)], + hiddenLayersTopology: Array[Int], + initialWeights: Vector, + maxNumIterations: Int): ArtificialNeuralNetworkModel = { + train(trainingRDD, hiddenLayersTopology, initialWeights, maxNumIterations, defaultTolerance) + } + + /** + * Trains an ANN using customized convergence tolerance. + * + * @param trainingRDD RDD containing (input, output) pairs for training. + * @param model model of an already partly trained ANN. + * @param maxNumIterations maximum number of training iterations. + * @param convergenceTol convergence tolerance for LBFGS. Smaller value for closer convergence. + * @return ANN model. + */ + def train(trainingRDD: RDD[(Vector,Vector)], + model: ArtificialNeuralNetworkModel, + maxNumIterations: Int, + convergenceTol: Double): ArtificialNeuralNetworkModel = { + new ArtificialNeuralNetwork(model.topology, maxNumIterations, convergenceTol). + run(trainingRDD, model.weights) + } + + /** + * Continues training of an ANN using customized convergence tolerance. + * + * @param trainingRDD RDD containing (input, output) pairs for training. + * @param hiddenLayersTopology number of nodes per hidden layer, excluding the bias nodes. + * @param maxNumIterations maximum number of training iterations. + * @param convergenceTol convergence tolerance for LBFGS. Smaller value for closer convergence. + * @return ANN model. + */ + def train(trainingRDD: RDD[(Vector, Vector)], + hiddenLayersTopology: Array[Int], + maxNumIterations: Int, + convergenceTol: Double): ArtificialNeuralNetworkModel = { + val topology = convertTopology(trainingRDD, hiddenLayersTopology) + new ArtificialNeuralNetwork(topology, maxNumIterations, convergenceTol). + run(trainingRDD, randomWeights(topology, false)) + } + + /** + * Trains an ANN with given initial weights. + * + * @param trainingRDD RDD containing (input, output) pairs for training. + * @param initialWeights initial weights vector. + * @param maxNumIterations maximum number of training iterations. + * @param convergenceTol convergence tolerance for LBFGS. Smaller value for closer convergence. + * @return ANN model. + */ + def train(trainingRDD: RDD[(Vector,Vector)], + hiddenLayersTopology: Array[Int], + initialWeights: Vector, + maxNumIterations: Int, + convergenceTol: Double): ArtificialNeuralNetworkModel = { + val topology = convertTopology(trainingRDD, hiddenLayersTopology) + new ArtificialNeuralNetwork(topology, maxNumIterations, convergenceTol). + run(trainingRDD, initialWeights) + } + + /** + * Provides a random weights vector. + * + * @param trainingRDD RDD containing (input, output) pairs for training. + * @param hiddenLayersTopology number of nodes per hidden layer, excluding the bias nodes. + * @return random weights vector. + */ + def randomWeights(trainingRDD: RDD[(Vector,Vector)], + hiddenLayersTopology: Array[Int]): Vector = { + val topology = convertTopology(trainingRDD, hiddenLayersTopology) + return randomWeights(topology, false) + } + + /** + * Provides a random weights vector, using given random seed. + * + * @param trainingRDD RDD containing (input, output) pairs for later training. + * @param hiddenLayersTopology number of nodes per hidden layer, excluding the bias nodes. + * @param seed random generator seed. + * @return random weights vector. + */ + def randomWeights(trainingRDD: RDD[(Vector,Vector)], + hiddenLayersTopology: Array[Int], + seed: Int): Vector = { + val topology = convertTopology(trainingRDD, hiddenLayersTopology) + return randomWeights(topology, true, seed) + } + + /** + * Provides a random weights vector, using given random seed. + * + * @param inputLayerSize size of input layer. + * @param outputLayerSize size of output layer. + * @param hiddenLayersTopology number of nodes per hidden layer, excluding the bias nodes. + * @param seed random generator seed. + * @return random weights vector. + */ + def randomWeights(inputLayerSize: Int, + outputLayerSize: Int, + hiddenLayersTopology: Array[Int], + seed: Int): Vector = { + val topology = inputLayerSize +: hiddenLayersTopology :+ outputLayerSize + return randomWeights(topology, true, seed) + } + + private def convertTopology(input: RDD[(Vector,Vector)], + hiddenLayersTopology: Array[Int] ): Array[Int] = { + val firstElt = input.first + firstElt._1.size +: hiddenLayersTopology :+ firstElt._2.size + } + + private def randomWeights(topology: Array[Int], useSeed: Boolean, seed: Int = 0): Vector = { + val rand: XORShiftRandom = + if( useSeed == false ) new XORShiftRandom() else new XORShiftRandom(seed) + var i: Int = 0 + var l: Int = 0 + val noWeights = { + var tmp = 0 + var i = 1 + while (i < topology.size) { + tmp = tmp + topology(i) * (topology(i - 1) + 1) + i += 1 + } + tmp + } + val initialWeightsArr = new Array[Double](noWeights) + var pos = 0 + l = 1 + while (l < topology.length) { + i = 0 + while (i < (topology(l) * (topology(l - 1) + 1))) { + initialWeightsArr(pos) = (rand.nextDouble * 4.8 - 2.4) / (topology(l - 1) + 1) + pos += 1 + i += 1 + } + l += 1 + } + Vectors.dense(initialWeightsArr) + } +} + + +/** + * ::Experimental:: + * Trait for roll/unroll weights and forward/back propagation in neural network + */ +@Experimental +private[ann] trait NeuralHelper { + protected val topology: Array[Int] + protected val weightCount = + (for(i <- 1 until topology.size) yield (topology(i) * topology(i - 1))).sum + + topology.sum - topology(0) + + protected def unrollWeights(weights: linalg.Vector): (Array[BDM[Double]], Array[BDV[Double]]) = { + require(weights.size == weightCount) + val weightsCopy = weights.toArray + val weightMatrices = new Array[BDM[Double]](topology.size) + val bias = new Array[BDV[Double]](topology.size) + var offset = 0 + for(i <- 1 until topology.size){ + weightMatrices(i) = new BDM[Double](topology(i), topology(i - 1), weightsCopy, offset) + offset += topology(i) * topology(i - 1) + /* TODO: BDM */ + bias(i) = new BDV[Double](weightsCopy, offset, 1, topology(i)) + offset += topology(i) + } + (weightMatrices, bias) + } + + protected def rollWeights(weightMatricesUpdate: Array[BDM[Double]], + biasUpdate: Array[BDV[Double]], + cumGradient: Vector): Unit = { + val wu = cumGradient.toArray + var offset = 0 + for(i <- 1 until topology.length){ + var k = 0 + val numElements = topology(i) * topology(i - 1) + while(k < numElements){ + wu(offset + k) += weightMatricesUpdate(i).data(k) + k += 1 + } + offset += numElements + k = 0 + while(k < topology(i)){ + wu(offset + k) += biasUpdate(i).data(k) + k += 1 + } + offset += topology(i) + } + } + + protected def forwardRun(data: BDM[Double], weightMatrices: Array[BDM[Double]], + bias: Array[BDV[Double]]): Array[BDM[Double]] = { + val outArray = new Array[BDM[Double]](topology.size) + outArray(0) = data + for(i <- 1 until topology.size) { + outArray(i) = weightMatrices(i) * outArray(i - 1)// :+ bias(i)) + outArray(i)(::, *) :+= bias(i) + Bsigmoid.inPlace(outArray(i)) + } + outArray + } + + protected def wGradient(weightMatrices: Array[BDM[Double]], + targetOutput: BDM[Double], + outputs: Array[BDM[Double]]): + (Array[BDM[Double]], Array[BDV[Double]]) = { + /* error back propagation */ + val deltas = new Array[BDM[Double]](topology.size) + val avgDeltas = new Array[BDV[Double]](topology.size) + for(i <- (topology.size - 1) until (0, -1)){ + /* TODO: GEMM? */ + val outPrime = BDM.ones[Double](outputs(i).rows, outputs(i).cols) + outPrime :-= outputs(i) + outPrime :*= outputs(i) + if(i == topology.size - 1){ + deltas(i) = (outputs(i) :- targetOutput) :* outPrime + }else{ + deltas(i) = (weightMatrices(i + 1).t * deltas(i + 1)) :* outPrime + } + avgDeltas(i) = Bsum(deltas(i)(*, ::)) + avgDeltas(i) :/= outputs(i).cols.toDouble + } + /* gradient */ + val gradientMatrices = new Array[BDM[Double]](topology.size) + for(i <- (topology.size - 1) until (0, -1)) { + /* TODO: GEMM? */ + gradientMatrices(i) = deltas(i) * outputs(i - 1).t + /* NB! dividing by the number of instances in + * the batch to be transparent for the optimizer */ + gradientMatrices(i) :/= outputs(i).cols.toDouble + } + (gradientMatrices, avgDeltas) + } +} + + +private class ANNLeastSquaresGradient(val topology: Array[Int], + val batchSize: Int = 1) extends Gradient with NeuralHelper { + + override def compute(data: Vector, label: Double, weights: Vector): (Vector, Double) = { + val gradient = Vectors.zeros(weights.size) + val loss = compute(data, label, weights, gradient) + (gradient, loss) + } + + override def compute(data: Vector, label: Double, weights: Vector, + cumGradient: Vector): Double = { + val arrData = data.toArray + val realBatchSize = arrData.length / (topology(0) + topology.last) + val input = new BDM(topology(0), realBatchSize, arrData) + val target = new BDM(topology.last, realBatchSize, arrData, topology(0) * realBatchSize) + val (weightMatrices, bias) = unrollWeights(weights) + /* forward run */ + val outputs = forwardRun(input, weightMatrices, bias) + /* error back propagation */ + val (gradientMatrices, deltas) = wGradient(weightMatrices, target, outputs) + rollWeights(gradientMatrices, deltas, cumGradient) + /* error */ + val diff = target :- outputs(topology.size - 1) + val outerError = Bsum(diff :* diff) / 2 + /* NB! dividing by the number of instances in + * the batch to be transparent for the optimizer */ + outerError / realBatchSize + } +} + +private class ANNUpdater extends Updater { + + override def compute(weightsOld: Vector, + gradient: Vector, + stepSize: Double, + iter: Int, + regParam: Double): (Vector, Double) = { + val thisIterStepSize = stepSize + val brzWeights: BV[Double] = weightsOld.toBreeze.toDenseVector + brzAxpy(-thisIterStepSize, gradient.toBreeze, brzWeights) + (Vectors.fromBreeze(brzWeights), 0) + } +} diff --git a/mllib/src/main/scala/org/apache/spark/mllib/classification/ANNClassifier.scala b/mllib/src/main/scala/org/apache/spark/mllib/classification/ANNClassifier.scala new file mode 100644 index 0000000000000..5376815094220 --- /dev/null +++ b/mllib/src/main/scala/org/apache/spark/mllib/classification/ANNClassifier.scala @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.classification + +import org.apache.spark.mllib.ann.{ArtificialNeuralNetworkModel, ArtificialNeuralNetwork} +import org.apache.spark.mllib.linalg.Vector +import org.apache.spark.mllib.regression.LabeledPoint +import org.apache.spark.rdd.RDD +import org.apache.spark.mllib.linalg.Vectors +import breeze.linalg.{argmax => Bargmax} + +import scala.util.Random + +trait ANNClassifierHelper { + + protected val labelToIndex: Map[Double, Int] + private val indexToLabel = labelToIndex.map(_.swap) + private val labelCount = labelToIndex.size + + protected def labeledPointToVectorPair(labeledPoint: LabeledPoint) = { + val output = Array.fill(labelCount){0.1} + output(labelToIndex(labeledPoint.label)) = 0.9 + (labeledPoint.features, Vectors.dense(output)) + } + + protected def outputToLabel(output: Vector): Double = { + val index = Bargmax(output.toBreeze.toDenseVector) + indexToLabel(index) + } +} + +class ANNClassifierModel private[mllib](val annModel: ArtificialNeuralNetworkModel, + val labelToIndex: Map[Double, Int]) + extends ClassificationModel with ANNClassifierHelper with Serializable { + /** + * Predict values for the given data set using the model trained. + * + * @param testData RDD representing data points to be predicted + * @return an RDD[Double] where each entry contains the corresponding prediction + */ + override def predict(testData: RDD[Vector]): RDD[Double] = testData.map(predict) + + /** + * Predict values for a single data point using the model trained. + * + * @param testData array representing a single data point + * @return predicted category from the trained model + */ + override def predict(testData: Vector): Double = { + val output = annModel.predict(testData) + outputToLabel(output) + } +} + +class ANNClassifier private(val labelToIndex: Map[Double, Int], + private val hiddenLayersTopology: Array[Int], + private val initialWeights: Vector, + private val maxIterations: Int, + private val stepSize: Double, + private val convergeTol: Double) + extends ANNClassifierHelper with Serializable { + + def run(data: RDD[LabeledPoint], batchSize: Int = 1): ANNClassifierModel = { + val annData = data.map(lp => labeledPointToVectorPair(lp)) + /* train the model */ + val model = ArtificialNeuralNetwork.train(annData, batchSize, hiddenLayersTopology, + initialWeights, maxIterations, convergeTol) + new ANNClassifierModel(model, labelToIndex) + } +} + +/** + * Top level methods for training the classifier based on artificial neural network (ANN) + */ +object ANNClassifier { + + private val defaultStepSize = 1.0 + private val defaultBatchSize = 1 + + /** + * Trains an ANN classifier. + * + * @param data RDD containing labeled points for training. + * @param batchSize batch size - number of instances to process in batch + * @param hiddenLayersTopology number of nodes per hidden layer, excluding the bias nodes. + * @param maxIterations specifies maximum number of training iterations. + * @param convergenceTol convergence tolerance for LBFGS + * @return ANN model. + */ + def train(data: RDD[LabeledPoint], + batchSize: Int, + hiddenLayersTopology: Array[Int], + maxIterations: Int, + convergenceTol: Double): ANNClassifierModel = { + val initialWeights = randomWeights(data, hiddenLayersTopology) + train(data, batchSize, hiddenLayersTopology, + initialWeights, maxIterations, defaultStepSize, convergenceTol) + } + + /** + * Trains an already pre-trained ANN classifier. + * Assumes that the data has the same labels that the + * data that were used for training, or at least the + * subset of that labels + * + * @param data RDD containing labeled points for training. + * @param batchSize batch size - number of instances to process in batch + * @param model a pre-trained ANN classifier model. + * @param maxIterations specifies maximum number of training iterations. + * @param convergenceTol convergence tolerance for LBFGS + * @return ANN classifier model. + */ + def train(data: RDD[LabeledPoint], + batchSize: Int, + model: ANNClassifierModel, + maxIterations: Int, + convergenceTol: Double): ANNClassifierModel = { + val hiddenLayersTopology = + model.annModel.topology.slice(1, model.annModel.topology.length - 1) + new ANNClassifier(model.labelToIndex, hiddenLayersTopology, + model.annModel.weights, maxIterations, defaultStepSize, convergenceTol).run(data, batchSize) + } + + /** + * Trains an ANN classifier. + * + * @param data RDD containing labeled points for training. + * @param batchSize batch size - number of instances to process in batch + * @param hiddenLayersTopology number of nodes per hidden layer, excluding the bias nodes. + * @param initialWeights initial weights of underlying artificial neural network + * @param maxIterations specifies maximum number of training iterations. + * @param stepSize step size (not implemented) + * @param convergenceTol convergence tolerance for LBFGS + * @return ANN model. + */ + def train(data: RDD[LabeledPoint], + batchSize: Int, + hiddenLayersTopology: Array[Int], + initialWeights: Vector, + maxIterations: Int, + stepSize: Double, + convergenceTol: Double): ANNClassifierModel = { + val labelToIndex = data.map( lp => lp.label).distinct().collect().sorted.zipWithIndex.toMap + new ANNClassifier(labelToIndex, hiddenLayersTopology, + initialWeights, maxIterations, stepSize, convergenceTol).run(data, batchSize) + } + + /** + * Trains an ANN classifier. + * + * @param data RDD containing labeled points for training. + * @param hiddenLayersTopology number of nodes per hidden layer, excluding the bias nodes. + * @param maxIterations specifies maximum number of training iterations. + * @param stepSize step size (not implemented) + * @param convergenceTol convergence tolerance for LBFGS + * @return ANN classifier model. + */ + def train(data: RDD[LabeledPoint], + hiddenLayersTopology: Array[Int], + maxIterations: Int, + stepSize: Double, + convergenceTol: Double): ANNClassifierModel = { + val initialWeights = randomWeights(data, hiddenLayersTopology) + train(data, defaultBatchSize, hiddenLayersTopology, initialWeights, maxIterations, stepSize, + convergenceTol) + } + + /** + * Trains an already pre-trained ANN classifier. + * Assumes that the data has the same labels that the + * data that were used for training, or at least the + * subset of that labels + * + * @param data RDD containing labeled points for training. + * @param model a pre-trained ANN classifier model. + * @param maxIterations specifies maximum number of training iterations. + * @param stepSize step size (not implemented) + * @param convergenceTol convergence tolerance for LBFGS + * @return ANN classifier model. + */ + def train(data: RDD[LabeledPoint], + model: ANNClassifierModel, + maxIterations: Int, + stepSize: Double, + convergenceTol: Double): ANNClassifierModel = { + val hiddenLayersTopology = + model.annModel.topology.slice(1, model.annModel.topology.length - 1) + new ANNClassifier(model.labelToIndex, hiddenLayersTopology, + model.annModel.weights, maxIterations, stepSize, convergenceTol).run(data) + } + + /** + * Trains an ANN classifier with one hidden layer of size (featureCount / 2 + 1) + * with 2000 steps of size 1.0 and tolerance 1e-4 + * + * @param data RDD containing labeled points for training. + * @return ANN classifier model. + */ + def train(data: RDD[LabeledPoint]): ANNClassifierModel = { + val featureCount = data.first().features.size + val hiddenSize = featureCount / 2 + 1 + val hiddenLayersTopology = Array[Int](hiddenSize) + train(data, hiddenLayersTopology, 2000, 1.0, 1e-4) + } + + /** + * Returns random weights for the ANN classifier with the given hidden layers + * and data dimensionality, i.e. the weights for the following topology: + * [numFeatures -: hiddenLayers :- numLabels] + * + * @param data RDD containing labeled points for training. + * @param hiddenLayersTopology number of nodes per hidden layer, excluding the bias nodes. + * @param seed + * @return vector with random weights. + */ + def randomWeights(data: RDD[LabeledPoint], + hiddenLayersTopology: Array[Int], seed: Int): Vector = { + /* TODO: remove duplication - the same analysis will be done in ANNClassifier.run() */ + val labelCount = data.map( lp => lp.label).distinct().collect().length + val featureCount = data.first().features.size + ArtificialNeuralNetwork.randomWeights(featureCount, labelCount, hiddenLayersTopology, seed) + } + + /** + * Returns random weights for the ANN classifier with the given hidden layers + * and data dimensionality, i.e. the weights for the following topology: + * [numFeatures -: hiddenLayers :- numLabels] + * + * @param data RDD containing labeled points for training. + * @param hiddenLayersTopology number of nodes per hidden layer, excluding the bias nodes. + * @return vector with random weights. + */ + def randomWeights(data: RDD[LabeledPoint], hiddenLayersTopology: Array[Int]): Vector = { + randomWeights(data, hiddenLayersTopology, Random.nextInt()) + } +} diff --git a/mllib/src/test/scala/org/apache/spark/mllib/ann/ANNSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/ann/ANNSuite.scala new file mode 100644 index 0000000000000..2bccdc09f841a --- /dev/null +++ b/mllib/src/test/scala/org/apache/spark/mllib/ann/ANNSuite.scala @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.mllib.ann + +import org.apache.spark.mllib.linalg.{DenseVector, Vectors, Vector} +import org.apache.spark.mllib.util.MLlibTestSparkContext +import org.apache.spark.util.random.XORShiftRandom +import breeze.linalg.{DenseVector => BDV} + +import org.scalatest.FunSuite + +class ANNSuite extends FunSuite with MLlibTestSparkContext { + + test("ANN learns XOR function") { + val inputs = Array[Array[Double]]( + Array[Double](0,0), + Array[Double](0,1), + Array[Double](1,0), + Array[Double](1,1) + ) + val outputs = Array[Double](0, 1, 1, 0) + val data = inputs.zip(outputs).map { case(features, label) => + (Vectors.dense(features), Vectors.dense(Array(label)))} + val rddData = sc.parallelize(data, 2) + val hiddenLayersTopology = Array[Int](5) + val initialWeights = ArtificialNeuralNetwork. + randomWeights(rddData, hiddenLayersTopology, 0x01234567) + val model = ArtificialNeuralNetwork. + train(rddData, 4, hiddenLayersTopology, initialWeights, 200, 1e-4) + val predictionAndLabels = rddData.map { case(input, label) => + (model.predict(input)(0), label(0)) }.collect() + assert(predictionAndLabels.forall { case(p, l) => (math.round(p) - l) == 0 }) + } + + /* + This test compares the output of the annGradient.compute function with the following + approximations: + + dE / dw_k ~= ( E(w + eps*e_k, x) - E(w, x) ) / eps + + where E(w, x) is the summed squared error multiplied by a factor 0.5, given weight vector w + and input x, w_k the k-th element in the weight vector (starting with k=0) and e_k the + associated k-th cartesian unit vector. + + The test is passed when the difference is less than accept=1e-7 with eps=1e-6. + */ + test("Gradient of ANN") { + val eps = 1e-6 + val accept = 1e-7 + val topologyArr = Array[Array[Int]]( + Array[Int](1, 5, 1), + Array[Int](5, 10, 5, 3), + Array[Int](128, 256, 128) + ) + val rnd = new XORShiftRandom(0) + var cnt = 0 + while( cnt + new LabeledPoint(output, Vectors.dense(input))} + val rddData = sc.parallelize(data, 2) + val hiddenLayerTopology = Array[Int]{5} + val initialWeights = ANNClassifier.randomWeights(rddData, hiddenLayerTopology, 0x01234567) + val model = ANNClassifier.train(rddData, 1, hiddenLayerTopology, initialWeights, 200, 1.0, 1e-4) + val predictionAndLabels = rddData.map(lp => + (model.predict(lp.features), lp.label)).collect() + assert(predictionAndLabels.forall { case(p, l) => + (p - l) == 0 }) + } +}