Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions bagel/src/test/scala/org/apache/spark/bagel/BagelSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ class BagelSuite extends FunSuite with Assertions with BeforeAndAfter with Timeo
sc.stop()
sc = null
}
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.driver.port")
}

test("halting by voting") {
Expand Down
38 changes: 38 additions & 0 deletions core/src/main/scala/org/apache/spark/Partitioner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -156,3 +156,41 @@ class RangePartitioner[K : Ordering : ClassTag, V](
false
}
}

/**
* A [[org.apache.spark.Partitioner]] that partitions records into balanced partitions
* by allocating keys to partitions in a round robin fashion.
*/
class BalancedPartitioner[K : Ordering : ClassTag, V](
partitions: Int,
@transient rdd: RDD[_ <: Product2[K,V]])
extends Partitioner {

// this array keeps track of keys assigned to a partition
// counts[0] refers to # of keys in partition 0 and so on
private val counts: Array[Int] = {
new Array[Int](numPartitions)
}

def numPartitions = math.abs(partitions)

var currPartition = 0

/*
* Pick current partition for the key in round robin manner
*/
def getPartition(key: Any): Int = {
val partition = currPartition
counts(partition) = counts(partition) + 1
currPartition = (currPartition + 1) % numPartitions
partition
}

override def equals(other: Any): Boolean = other match {
case r: BalancedPartitioner[_,_] =>
(r.counts.sameElements(counts)
&& r.currPartition == currPartition)
case _ =>
false
}
}
2 changes: 0 additions & 2 deletions core/src/test/java/org/apache/spark/JavaAPISuite.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@ public void setUp() {
public void tearDown() {
sc.stop();
sc = null;
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.driver.port");
}

static class ReverseIntComparator implements Comparator<Integer>, Serializable {
Expand Down
4 changes: 0 additions & 4 deletions core/src/test/scala/org/apache/spark/AkkaUtilsSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext {
val hostname = "localhost"
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0,
conf = conf, securityManager = securityManager)
System.setProperty("spark.driver.port", boundPort.toString) // Will be cleared by LocalSparkContext
System.setProperty("spark.hostPort", hostname + ":" + boundPort)
assert(securityManager.isAuthenticationEnabled() === true)

Expand Down Expand Up @@ -77,7 +76,6 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext {
val hostname = "localhost"
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0,
conf = conf, securityManager = securityManager)
System.setProperty("spark.driver.port", boundPort.toString) // Will be cleared by LocalSparkContext
System.setProperty("spark.hostPort", hostname + ":" + boundPort)

assert(securityManager.isAuthenticationEnabled() === false)
Expand Down Expand Up @@ -129,7 +127,6 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext {
val hostname = "localhost"
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0,
conf = conf, securityManager = securityManager)
System.setProperty("spark.driver.port", boundPort.toString) // Will be cleared by LocalSparkContext
System.setProperty("spark.hostPort", hostname + ":" + boundPort)

assert(securityManager.isAuthenticationEnabled() === true)
Expand Down Expand Up @@ -182,7 +179,6 @@ class AkkaUtilsSuite extends FunSuite with LocalSparkContext {
val hostname = "localhost"
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0,
conf = conf, securityManager = securityManager)
System.setProperty("spark.driver.port", boundPort.toString) // Will be cleared by LocalSparkContext
System.setProperty("spark.hostPort", hostname + ":" + boundPort)

assert(securityManager.isAuthenticationEnabled() === true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,6 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, 0, conf = conf,
securityManager = new SecurityManager(conf))

// Will be cleared by LocalSparkContext
System.setProperty("spark.driver.port", boundPort.toString)

val masterTracker = new MapOutputTrackerMaster(conf)
masterTracker.trackerActor = actorSystem.actorOf(
Props(new MapOutputTrackerMasterActor(masterTracker, conf)), "MapOutputTracker")
Expand Down
28 changes: 28 additions & 0 deletions core/src/test/scala/org/apache/spark/PartitioningSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,34 @@ class PartitioningSuite extends FunSuite with SharedSparkContext with PrivateMet
assert(descendingP4 != p4)
}

test("BalancedPartitioner equality") {
// Make an RDD where all the elements are the same so that the partition range bounds
// are deterministically all the same.
val rdd = sc.parallelize(1.to(4000)).map(x => (x, x))

val p2 = new BalancedPartitioner(2, rdd)
val p4 = new BalancedPartitioner(4, rdd)
val anotherP4 = new BalancedPartitioner(4, rdd)

assert(p2 === p2)
assert(p4 === p4)
assert(p2 != p4)
assert(p4 != p2)
assert(p4 === anotherP4)
assert(anotherP4 === p4)
}

test("BalancedPartitioner getPartition") {
val rdd = sc.parallelize(1.to(2000)).map(x => (x, x))
val partitioner = new BalancedPartitioner(4, rdd)
var expectedPartition = 0
1.to(2000).map { element => {
val partition = partitioner.getPartition(element)
assert(partition === expectedPartition)
expectedPartition = (expectedPartition + 1) % 4
}}
}

test("RangePartitioner getPartition") {
val rdd = sc.parallelize(1.to(2000)).map(x => (x, x))
// We have different behaviour of getPartition for partitions with less than 1000 and more than
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,6 @@ class BlockManagerSuite extends FunSuite with BeforeAndAfter with PrivateMethodT
}

after {
System.clearProperty("spark.driver.port")

if (store != null) {
store.stop()
store = null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,6 @@ public void setUp() {
public void tearDown() {
sc.stop();
sc = null;
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.driver.port");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,6 @@ trait LocalSparkContext {
f(sc)
} finally {
sc.stop()
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.driver.port")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ public void setUp() {
public void tearDown() {
sc.stop();
sc = null;
System.clearProperty("spark.driver.port");
}

int validatePrediction(List<LabeledPoint> validationData, LogisticRegressionModel model) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ public void setUp() {
public void tearDown() {
sc.stop();
sc = null;
System.clearProperty("spark.driver.port");
}

private static final List<LabeledPoint> POINTS = Arrays.asList(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ public void setUp() {
public void tearDown() {
sc.stop();
sc = null;
System.clearProperty("spark.driver.port");
}

int validatePrediction(List<LabeledPoint> validationData, SVMModel model) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ public void setUp() {
public void tearDown() {
sc.stop();
sc = null;
System.clearProperty("spark.driver.port");
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@ public void setUp() {
public void tearDown() {
sc.stop();
sc = null;
System.clearProperty("spark.driver.port");
}

static void validatePrediction(MatrixFactorizationModel model, int users, int products, int features,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ public void setUp() {
public void tearDown() {
sc.stop();
sc = null;
System.clearProperty("spark.driver.port");
}

int validatePrediction(List<LabeledPoint> validationData, LassoModel model) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ public void setUp() {
public void tearDown() {
sc.stop();
sc = null;
System.clearProperty("spark.driver.port");
}

int validatePrediction(List<LabeledPoint> validationData, LinearRegressionModel model) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ public void setUp() {
public void tearDown() {
sc.stop();
sc = null;
System.clearProperty("spark.driver.port");
}

double predictionError(List<LabeledPoint> validationData, RidgeRegressionModel model) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ trait LocalSparkContext extends BeforeAndAfterAll { self: Suite =>
if (sc != null) {
sc.stop()
}
System.clearProperty("spark.driver.port")
super.afterAll()
}
}
4 changes: 0 additions & 4 deletions python/pyspark/tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,10 +49,6 @@ def setUp(self):
def tearDown(self):
self.sc.stop()
sys.path = self._old_sys_path
# To avoid Akka rebinding to the same port, since it doesn't unbind
# immediately on shutdown
self.sc._jvm.System.clearProperty("spark.driver.port")


class TestCheckpoint(PySparkTestCase):

Expand Down
2 changes: 0 additions & 2 deletions repl/src/test/scala/org/apache/spark/repl/ReplSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ class ReplSuite extends FunSuite {
if (interp.sparkContext != null) {
interp.sparkContext.stop()
}
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.driver.port")
return out.toString
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ class TestHiveContext(sc: SparkContext) extends LocalHiveContext(sc) {

// By clearing the port we force Spark to pick a new one. This allows us to rerun tests
// without restarting the JVM.
System.clearProperty("spark.driver.port")
System.clearProperty("spark.hostPort")

override lazy val warehousePath = getTempFilePath("sparkHiveWarehouse").getCanonicalPath
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,6 @@ object MasterFailureTest extends Logging {
setupCalled = true

// Setup the streaming computation with the given operation
System.clearProperty("spark.driver.port")
val ssc = new StreamingContext("local[4]", "MasterFailureTest", batchDuration, null, Nil,
Map())
ssc.checkpoint(checkpointDir.toString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ public abstract class LocalJavaStreamingContext {

@Before
public void setUp() {
System.clearProperty("spark.driver.port");
System.setProperty("spark.streaming.clock", "org.apache.spark.streaming.util.ManualClock");
ssc = new JavaStreamingContext("local[2]", "test", new Duration(1000));
ssc.checkpoint("checkpoint");
Expand All @@ -37,8 +36,5 @@ public void setUp() {
public void tearDown() {
ssc.stop();
ssc = null;

// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.driver.port");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,6 @@ class CheckpointSuite extends TestSuiteBase {
"\n-------------------------------------------\n"
)
ssc = new StreamingContext(checkpointDir)
System.clearProperty("spark.driver.port")
ssc.start()
val outputNew = advanceTimeWithRealDelay[V](ssc, nextNumBatches)
// the first element will be re-processed data of the last batch before restart
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,8 +153,6 @@ trait TestSuiteBase extends FunSuite with BeforeAndAfter with Logging {
// Default after function for any streaming test suite. Override this
// if you want to add your stuff to "after" (i.e., don't call after { } )
def afterFunction() {
// To avoid Akka rebinding to the same port, since it doesn't unbind immediately on shutdown
System.clearProperty("spark.driver.port")
System.clearProperty("spark.streaming.clock")
}

Expand Down