Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.api.java.function;

import java.io.Serializable;

/**
* A four-argument function that takes arguments of type T1, T2, T3, and T4 and returns an R.
*/
public interface Function4<T1, T2, T3, T4, R> extends Serializable {
public R call(T1 v1, T2 v2, T3 v3, T4 v4) throws Exception;
}
15 changes: 14 additions & 1 deletion core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ import org.apache.spark.SparkContext.rddToPairRDDFunctions
import org.apache.spark.annotation.Experimental
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
import org.apache.spark.api.java.JavaUtils.mapAsSerializableJavaMap
import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2, PairFunction}
import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2,
Function3 => JFunction3, Function4 => JFunction4, PairFunction, _}
import org.apache.spark.partial.{BoundedDouble, PartialResult}
import org.apache.spark.rdd.{OrderedRDDFunctions, RDD}
import org.apache.spark.storage.StorageLevel
Expand Down Expand Up @@ -991,6 +992,18 @@ object JavaPairRDD {

implicit def toRDD[K, V](rdd: JavaPairRDD[K, V]): RDD[(K, V)] = rdd.rdd

private[spark]
implicit def toScalaFunction4[T1, T2, T3, T4, R](
fun: JFunction4[T1, T2, T3, T4, R]): Function4[T1, T2, T3, T4, R] = {
(x1: T1, x2: T2, x3: T3, x4: T4) => fun.call(x1, x2, x3, x4)
}

private[spark]
implicit def toScalaFunction3[T1, T2, T3, R](
fun: JFunction3[T1, T2, T3, R]): Function3[T1, T2, T3, R] = {
(x1: T1, x2: T2, x3: T3) => fun.call(x1, x2, x3)
}

private[spark]
implicit def toScalaFunction2[T1, T2, R](fun: JFunction2[T1, T2, R]): Function2[T1, T2, R] = {
(x: T1, x1: T2) => fun.call(x, x1)
Expand Down
2 changes: 1 addition & 1 deletion graphx/src/main/scala/org/apache/spark/graphx/Graph.scala
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ object Graph {
* @param vertexStorageLevel the desired storage level at which to cache the vertices if necessary
*
* @return a graph with edge attributes containing either the count of duplicate edges or 1
* (if `uniqueEdges` is `None`) and vertex attributes containing the total degree of each vertex.
* (if `uniqueEdges` is `None`) and all vertex attributes set to `defaultValue`.
*/
def fromEdgeTuples[VD: ClassTag](
rawEdges: RDD[(VertexId, VertexId)],
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.graphx.api.java

import java.lang.{Integer => JInt, Long => JLong, Boolean => JBoolean}

import scala.collection.JavaConversions._
import scala.language.implicitConversions
import scala.reflect.ClassTag

import org.apache.spark.api.java.JavaPairRDD._
import org.apache.spark.api.java.JavaRDD
import org.apache.spark.api.java.JavaSparkContext.fakeClassTag
import org.apache.spark.api.java.JavaUtils
import org.apache.spark.api.java.function.{Function => JFunction, Function2 => JFunction2,
Function3 => JFunction3, Function4 => JFunction4, _}
import org.apache.spark.graphx._

class JavaEdgeRDD[ED, VD](override val rdd: EdgeRDD[ED, VD])(
implicit val edTag: ClassTag[ED], val vdTag: ClassTag[VD])
extends JavaRDD[Edge[ED]](rdd) {

/**
* Map the values in an edge partitioning preserving the structure but changing the values.
*
* @tparam ED2 the new edge value type
* @param f the function from an edge to a new edge value
* @return a new EdgeRDD containing the new edge values
*/
def mapValues[ED2](f: JFunction[Edge[ED], ED2]): JavaEdgeRDD[ED2, VD] = {
implicit val ed2Tag: ClassTag[ED2] = fakeClassTag
rdd.mapValues(f)
}

/**
* Reverse all the edges in this RDD.
*
* @return a new EdgeRDD containing all the edges reversed
*/
def reverse(): JavaEdgeRDD[ED, VD] = rdd.reverse

/** Removes all edges but those matching `epred` and where both vertices match `vpred`. */
def filter(
epred: JFunction[EdgeTriplet[VD, ED], JBoolean],
vpred: JFunction2[JLong, VD, JBoolean]): JavaEdgeRDD[ED, VD] =
rdd.filter(et => epred.call(et), (id, attr) => vpred.call(id, attr))

/**
* Inner joins this EdgeRDD with another EdgeRDD, assuming both are partitioned using the same
* [[PartitionStrategy]].
*
* @param other the EdgeRDD to join with
* @param f the join function applied to corresponding values of `this` and `other`
* @return a new EdgeRDD containing only edges that appear in both `this` and `other`,
* with values supplied by `f`
*/
def innerJoin[ED2, ED3](
other: JavaEdgeRDD[ED2, _],
f: JFunction4[JLong, JLong, ED, ED2, ED3]): JavaEdgeRDD[ED3, VD] = {
implicit val ed2Tag: ClassTag[ED2] = fakeClassTag
implicit val ed3Tag: ClassTag[ED3] = fakeClassTag
rdd.innerJoin(other) { (src, dst, a, b) => f(src, dst, a, b) }
}
}

object JavaEdgeRDD {

def fromEdges[ED, VD](edges: JavaRDD[Edge[ED]]): JavaEdgeRDD[ED, VD] = {
implicit val edTag: ClassTag[ED] = fakeClassTag
implicit val vdTag: ClassTag[VD] = fakeClassTag
fromEdgeRDD(EdgeRDD.fromEdges(edges))
}

implicit def fromEdgeRDD[ED: ClassTag, VD: ClassTag](rdd: EdgeRDD[ED, VD]): JavaEdgeRDD[ED, VD] =
new JavaEdgeRDD[ED, VD](rdd)

implicit def toEdgeRDD[ED, VD](rdd: JavaEdgeRDD[ED, VD]): EdgeRDD[ED, VD] =
rdd.rdd
}
Loading