From 79ed52ac019bb271f4a1dec48f095f6d91ea79b9 Mon Sep 17 00:00:00 2001 From: Kan Zhang Date: Fri, 18 Apr 2014 15:23:26 -0700 Subject: [PATCH 1/3] [SPARK-1460] Returning SchemaRDD on Set operations that do not change schema Adding some missing functions Adding a few more functions Simply wrap overriden methods in base RDD class Adding functions to Java API Removing Scaladoc on overriding methods removing implicit conversions Renaming wrapSchemaRDD to fromSchemaRDD A better toString for JavaSchemaRDD Use this.type as return type Adding Python API changes for SchemaRDD Replacing fromSchemaRDD() with toJavaSchemaRDD() avoid using this.type in Jave API removing redundant methods in EdgeRDD and VertexRDD --- .../main/scala/org/apache/spark/rdd/RDD.scala | 10 +- .../org/apache/spark/graphx/EdgeRDD.scala | 10 +- .../org/apache/spark/graphx/VertexRDD.scala | 10 +- python/pyspark/sql.py | 29 ++++ .../org/apache/spark/sql/SchemaRDD.scala | 62 +++++++- .../spark/sql/api/java/JavaSchemaRDD.scala | 140 ++++++++++++++++++ 6 files changed, 239 insertions(+), 22 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 3b3524f33e811..a1ca612cc9a09 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -128,7 +128,7 @@ abstract class RDD[T: ClassTag]( @transient var name: String = null /** Assign a name to this RDD */ - def setName(_name: String): RDD[T] = { + def setName(_name: String): this.type = { name = _name this } @@ -138,7 +138,7 @@ abstract class RDD[T: ClassTag]( * it is computed. This can only be used to assign a new storage level if the RDD does not * have a storage level set yet.. */ - def persist(newLevel: StorageLevel): RDD[T] = { + def persist(newLevel: StorageLevel): this.type = { // TODO: Handle changes of StorageLevel if (storageLevel != StorageLevel.NONE && newLevel != storageLevel) { throw new UnsupportedOperationException( @@ -152,10 +152,10 @@ abstract class RDD[T: ClassTag]( } /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ - def persist(): RDD[T] = persist(StorageLevel.MEMORY_ONLY) + def persist(): this.type = persist(StorageLevel.MEMORY_ONLY) /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ - def cache(): RDD[T] = persist() + def cache(): this.type = persist() /** * Mark the RDD as non-persistent, and remove all blocks for it from memory and disk. @@ -163,7 +163,7 @@ abstract class RDD[T: ClassTag]( * @param blocking Whether to block until all blocks are deleted. * @return This RDD. */ - def unpersist(blocking: Boolean = true): RDD[T] = { + def unpersist(blocking: Boolean = true): this.type = { logInfo("Removing RDD " + id + " from persistence list") sc.unpersistRDD(id, blocking) storageLevel = StorageLevel.NONE diff --git a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala index 6d04bf790e3a5..fa78ca99b8891 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/EdgeRDD.scala @@ -51,18 +51,12 @@ class EdgeRDD[@specialized ED: ClassTag]( override def collect(): Array[Edge[ED]] = this.map(_.copy()).collect() - override def persist(newLevel: StorageLevel): EdgeRDD[ED] = { + override def persist(newLevel: StorageLevel): this.type = { partitionsRDD.persist(newLevel) this } - /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ - override def persist(): EdgeRDD[ED] = persist(StorageLevel.MEMORY_ONLY) - - /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ - override def cache(): EdgeRDD[ED] = persist() - - override def unpersist(blocking: Boolean = true): EdgeRDD[ED] = { + override def unpersist(blocking: Boolean = true): this.type = { partitionsRDD.unpersist(blocking) this } diff --git a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala index d6788d4d4b9fd..f0fc605c88575 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/VertexRDD.scala @@ -71,18 +71,12 @@ class VertexRDD[@specialized VD: ClassTag]( override protected def getPreferredLocations(s: Partition): Seq[String] = partitionsRDD.preferredLocations(s) - override def persist(newLevel: StorageLevel): VertexRDD[VD] = { + override def persist(newLevel: StorageLevel): this.type = { partitionsRDD.persist(newLevel) this } - /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ - override def persist(): VertexRDD[VD] = persist(StorageLevel.MEMORY_ONLY) - - /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ - override def cache(): VertexRDD[VD] = persist() - - override def unpersist(blocking: Boolean = true): VertexRDD[VD] = { + override def unpersist(blocking: Boolean = true): this.type = { partitionsRDD.unpersist(blocking) this } diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py index 1a62031db5c41..6789d7002b3b7 100644 --- a/python/pyspark/sql.py +++ b/python/pyspark/sql.py @@ -360,6 +360,35 @@ def getCheckpointFile(self): else: return None + def coalesce(self, numPartitions, shuffle=False): + rdd = self._jschema_rdd.coalesce(numPartitions, shuffle) + return SchemaRDD(rdd, self.sql_ctx) + + def distinct(self): + rdd = self._jschema_rdd.distinct() + return SchemaRDD(rdd, self.sql_ctx) + + def intersection(self, other): + if (other.__class__ is SchemaRDD): + rdd = self._jschema_rdd.intersection(other._jschema_rdd) + return SchemaRDD(rdd, self.sql_ctx) + else: + raise ValueError("Can only intersect with another SchemaRDD") + + def repartition(self, numPartitions): + rdd = self._jschema_rdd.repartition(numPartitions) + return SchemaRDD(rdd, self.sql_ctx) + + def subtract(self, other, numPartitions=None): + if (other.__class__ is SchemaRDD): + if numPartitions is None: + rdd = self._jschema_rdd.subtract(other._jschema_rdd) + else: + rdd = self._jschema_rdd.subtract(other._jschema_rdd, numPartitions) + return SchemaRDD(rdd, self.sql_ctx) + else: + raise ValueError("Can only subtract another SchemaRDD") + def _test(): import doctest from pyspark.context import SparkContext diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala index d7782d6b32819..6685b06764cf7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala @@ -19,14 +19,16 @@ package org.apache.spark.sql import net.razorvine.pickle.Pickler -import org.apache.spark.{Dependency, OneToOneDependency, Partition, TaskContext} +import org.apache.spark.{Dependency, OneToOneDependency, Partition, Partitioner, TaskContext} import org.apache.spark.annotation.{AlphaComponent, Experimental} import org.apache.spark.rdd.RDD +import org.apache.spark.sql.api.java.JavaSchemaRDD import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.plans.{Inner, JoinType} import org.apache.spark.sql.catalyst.types.BooleanType +import org.apache.spark.sql.execution.{ExistingRdd, SparkLogicalPlan} import org.apache.spark.api.java.JavaRDD import java.util.{Map => JMap} @@ -296,6 +298,13 @@ class SchemaRDD( */ def toSchemaRDD = this + /** + * Returns this RDD as a JavaSchemaRDD. + * + * @group schema + */ + def toJavaSchemaRDD: JavaSchemaRDD = new JavaSchemaRDD(sqlContext, logicalPlan) + private[sql] def javaToPython: JavaRDD[Array[Byte]] = { val fieldNames: Seq[String] = this.queryExecution.analyzed.output.map(_.name) this.mapPartitions { iter => @@ -314,4 +323,55 @@ class SchemaRDD( } } } + + /** + * Creates SchemaRDD by applying own schema to derived RDD. Typically used to wrap return value + * of base RDD functions that do not change schema. + * + * @param rdd RDD derived from this one and has same schema + * + * @group schema + */ + private def applySchema(rdd: RDD[Row]): SchemaRDD = { + new SchemaRDD(sqlContext, SparkLogicalPlan(ExistingRdd(logicalPlan.output, rdd))) + } + + // ======================================================================= + // Base RDD functions that do NOT change schema + // ======================================================================= + + // Transformations (return a new RDD) + + override def coalesce(numPartitions: Int, shuffle: Boolean = false): SchemaRDD = + applySchema(super.coalesce(numPartitions, shuffle)) + + override def distinct(): SchemaRDD = + applySchema(super.distinct()) + + override def distinct(numPartitions: Int): SchemaRDD = + applySchema(super.distinct(numPartitions)) + + override def filter(f: Row => Boolean): SchemaRDD = + applySchema(super.filter(f)) + + override def intersection(other: RDD[Row]): SchemaRDD = + applySchema(super.intersection(other)) + + override def intersection(other: RDD[Row], partitioner: Partitioner): SchemaRDD = + applySchema(super.intersection(other, partitioner)) + + override def intersection(other: RDD[Row], numPartitions: Int): SchemaRDD = + applySchema(super.intersection(other, numPartitions)) + + override def repartition(numPartitions: Int): SchemaRDD = + applySchema(super.repartition(numPartitions)) + + override def subtract(other: RDD[Row]): SchemaRDD = + applySchema(super.subtract(other)) + + override def subtract(other: RDD[Row], numPartitions: Int): SchemaRDD = + applySchema(super.subtract(other, numPartitions)) + + override def subtract(other: RDD[Row], p: Partitioner): SchemaRDD = + applySchema(super.subtract(other, p)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala index d43d672938f51..22f57b758dd02 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/api/java/JavaSchemaRDD.scala @@ -17,10 +17,13 @@ package org.apache.spark.sql.api.java +import org.apache.spark.Partitioner import org.apache.spark.api.java.{JavaRDDLike, JavaRDD} +import org.apache.spark.api.java.function.{Function => JFunction} import org.apache.spark.sql.{SQLContext, SchemaRDD, SchemaRDDLike} import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.rdd.RDD +import org.apache.spark.storage.StorageLevel /** * An RDD of [[Row]] objects that is returned as the result of a Spark SQL query. In addition to @@ -45,4 +48,141 @@ class JavaSchemaRDD( override def wrapRDD(rdd: RDD[Row]): JavaRDD[Row] = JavaRDD.fromRDD(rdd) val rdd = baseSchemaRDD.map(new Row(_)) + + override def toString: String = baseSchemaRDD.toString + + // ======================================================================= + // Base RDD functions that do NOT change schema + // ======================================================================= + + // Common RDD functions + + /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ + def cache(): JavaSchemaRDD = { + baseSchemaRDD.cache() + this + } + + /** Persist this RDD with the default storage level (`MEMORY_ONLY`). */ + def persist(): JavaSchemaRDD = { + baseSchemaRDD.persist() + this + } + + /** + * Set this RDD's storage level to persist its values across operations after the first time + * it is computed. This can only be used to assign a new storage level if the RDD does not + * have a storage level set yet.. + */ + def persist(newLevel: StorageLevel): JavaSchemaRDD = { + baseSchemaRDD.persist(newLevel) + this + } + + /** + * Mark the RDD as non-persistent, and remove all blocks for it from memory and disk. + * + * @param blocking Whether to block until all blocks are deleted. + * @return This RDD. + */ + def unpersist(blocking: Boolean = true): JavaSchemaRDD = { + baseSchemaRDD.unpersist(blocking) + this + } + + /** Assign a name to this RDD */ + def setName(name: String): JavaSchemaRDD = { + baseSchemaRDD.setName(name) + this + } + + // Transformations (return a new RDD) + + /** + * Return a new RDD that is reduced into `numPartitions` partitions. + */ + def coalesce(numPartitions: Int, shuffle: Boolean = false): JavaSchemaRDD = + baseSchemaRDD.coalesce(numPartitions, shuffle).toJavaSchemaRDD + + /** + * Return a new RDD containing the distinct elements in this RDD. + */ + def distinct(): JavaSchemaRDD = + baseSchemaRDD.distinct().toJavaSchemaRDD + + /** + * Return a new RDD containing the distinct elements in this RDD. + */ + def distinct(numPartitions: Int): JavaSchemaRDD = + baseSchemaRDD.distinct(numPartitions).toJavaSchemaRDD + + /** + * Return a new RDD containing only the elements that satisfy a predicate. + */ + def filter(f: JFunction[Row, java.lang.Boolean]): JavaSchemaRDD = + baseSchemaRDD.filter(x => f.call(new Row(x)).booleanValue()).toJavaSchemaRDD + + /** + * Return the intersection of this RDD and another one. The output will not contain any + * duplicate elements, even if the input RDDs did. + * + * Note that this method performs a shuffle internally. + */ + def intersection(other: JavaSchemaRDD): JavaSchemaRDD = + this.baseSchemaRDD.intersection(other.baseSchemaRDD).toJavaSchemaRDD + + /** + * Return the intersection of this RDD and another one. The output will not contain any + * duplicate elements, even if the input RDDs did. + * + * Note that this method performs a shuffle internally. + * + * @param partitioner Partitioner to use for the resulting RDD + */ + def intersection(other: JavaSchemaRDD, partitioner: Partitioner): JavaSchemaRDD = + this.baseSchemaRDD.intersection(other.baseSchemaRDD, partitioner).toJavaSchemaRDD + + /** + * Return the intersection of this RDD and another one. The output will not contain any + * duplicate elements, even if the input RDDs did. Performs a hash partition across the cluster + * + * Note that this method performs a shuffle internally. + * + * @param numPartitions How many partitions to use in the resulting RDD + */ + def intersection(other: JavaSchemaRDD, numPartitions: Int): JavaSchemaRDD = + this.baseSchemaRDD.intersection(other.baseSchemaRDD, numPartitions).toJavaSchemaRDD + + /** + * Return a new RDD that has exactly `numPartitions` partitions. + * + * Can increase or decrease the level of parallelism in this RDD. Internally, this uses + * a shuffle to redistribute data. + * + * If you are decreasing the number of partitions in this RDD, consider using `coalesce`, + * which can avoid performing a shuffle. + */ + def repartition(numPartitions: Int): JavaSchemaRDD = + baseSchemaRDD.repartition(numPartitions).toJavaSchemaRDD + + /** + * Return an RDD with the elements from `this` that are not in `other`. + * + * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting + * RDD will be <= us. + */ + def subtract(other: JavaSchemaRDD): JavaSchemaRDD = + this.baseSchemaRDD.subtract(other.baseSchemaRDD).toJavaSchemaRDD + + /** + * Return an RDD with the elements from `this` that are not in `other`. + */ + def subtract(other: JavaSchemaRDD, numPartitions: Int): JavaSchemaRDD = + this.baseSchemaRDD.subtract(other.baseSchemaRDD, numPartitions).toJavaSchemaRDD + + /** + * Return an RDD with the elements from `this` that are not in `other`. + */ + def subtract(other: JavaSchemaRDD, p: Partitioner): JavaSchemaRDD = + this.baseSchemaRDD.subtract(other.baseSchemaRDD, p).toJavaSchemaRDD } From 91dc787edbdca2750ea280f3830e3480774156d2 Mon Sep 17 00:00:00 2001 From: Kan Zhang Date: Tue, 6 May 2014 12:42:29 -0700 Subject: [PATCH 2/3] Taking into account newly added Ordering param --- .../org/apache/spark/sql/SchemaRDD.scala | 25 +++++++++++-------- 1 file changed, 15 insertions(+), 10 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala index 6685b06764cf7..34200be3ac955 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SchemaRDD.scala @@ -342,14 +342,16 @@ class SchemaRDD( // Transformations (return a new RDD) - override def coalesce(numPartitions: Int, shuffle: Boolean = false): SchemaRDD = - applySchema(super.coalesce(numPartitions, shuffle)) + override def coalesce(numPartitions: Int, shuffle: Boolean = false) + (implicit ord: Ordering[Row] = null): SchemaRDD = + applySchema(super.coalesce(numPartitions, shuffle)(ord)) override def distinct(): SchemaRDD = applySchema(super.distinct()) - override def distinct(numPartitions: Int): SchemaRDD = - applySchema(super.distinct(numPartitions)) + override def distinct(numPartitions: Int) + (implicit ord: Ordering[Row] = null): SchemaRDD = + applySchema(super.distinct(numPartitions)(ord)) override def filter(f: Row => Boolean): SchemaRDD = applySchema(super.filter(f)) @@ -357,14 +359,16 @@ class SchemaRDD( override def intersection(other: RDD[Row]): SchemaRDD = applySchema(super.intersection(other)) - override def intersection(other: RDD[Row], partitioner: Partitioner): SchemaRDD = - applySchema(super.intersection(other, partitioner)) + override def intersection(other: RDD[Row], partitioner: Partitioner) + (implicit ord: Ordering[Row] = null): SchemaRDD = + applySchema(super.intersection(other, partitioner)(ord)) override def intersection(other: RDD[Row], numPartitions: Int): SchemaRDD = applySchema(super.intersection(other, numPartitions)) - override def repartition(numPartitions: Int): SchemaRDD = - applySchema(super.repartition(numPartitions)) + override def repartition(numPartitions: Int) + (implicit ord: Ordering[Row] = null): SchemaRDD = + applySchema(super.repartition(numPartitions)(ord)) override def subtract(other: RDD[Row]): SchemaRDD = applySchema(super.subtract(other)) @@ -372,6 +376,7 @@ class SchemaRDD( override def subtract(other: RDD[Row], numPartitions: Int): SchemaRDD = applySchema(super.subtract(other, numPartitions)) - override def subtract(other: RDD[Row], p: Partitioner): SchemaRDD = - applySchema(super.subtract(other, p)) + override def subtract(other: RDD[Row], p: Partitioner) + (implicit ord: Ordering[Row] = null): SchemaRDD = + applySchema(super.subtract(other, p)(ord)) } From 111e3882f55d5bdd06b357512a0d4284adb78116 Mon Sep 17 00:00:00 2001 From: Kan Zhang Date: Tue, 6 May 2014 17:45:24 -0700 Subject: [PATCH 3/3] silence MiMa errors in EdgeRDD and VertexRDD --- project/MimaBuild.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/project/MimaBuild.scala b/project/MimaBuild.scala index d540dc0a986e9..efdb38e907d14 100644 --- a/project/MimaBuild.scala +++ b/project/MimaBuild.scala @@ -74,6 +74,8 @@ object MimaBuild { ) ++ excludeSparkClass("rdd.ClassTags") ++ excludeSparkClass("util.XORShiftRandom") ++ + excludeSparkClass("graphx.EdgeRDD") ++ + excludeSparkClass("graphx.VertexRDD") ++ excludeSparkClass("mllib.recommendation.MFDataGenerator") ++ excludeSparkClass("mllib.optimization.SquaredGradient") ++ excludeSparkClass("mllib.regression.RidgeRegressionWithSGD") ++