Skip to content

Commit 22730ad

Browse files
committed
[SPARK-10547] [TEST] Streamline / improve style of Java API tests
Fix a few Java API test style issues: unused generic types, exceptions, wrong assert argument order Author: Sean Owen <[email protected]> Closes #8706 from srowen/SPARK-10547.
1 parent 8285e3b commit 22730ad

File tree

15 files changed

+755
-761
lines changed

15 files changed

+755
-761
lines changed

core/src/test/java/org/apache/spark/JavaAPISuite.java

Lines changed: 224 additions & 227 deletions
Large diffs are not rendered by default.

external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaDirectKafkaStreamSuite.java

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -75,11 +75,11 @@ public void testKafkaStream() throws InterruptedException {
7575
String[] topic1data = createTopicAndSendData(topic1);
7676
String[] topic2data = createTopicAndSendData(topic2);
7777

78-
HashSet<String> sent = new HashSet<String>();
78+
Set<String> sent = new HashSet<>();
7979
sent.addAll(Arrays.asList(topic1data));
8080
sent.addAll(Arrays.asList(topic2data));
8181

82-
HashMap<String, String> kafkaParams = new HashMap<String, String>();
82+
Map<String, String> kafkaParams = new HashMap<>();
8383
kafkaParams.put("metadata.broker.list", kafkaTestUtils.brokerAddress());
8484
kafkaParams.put("auto.offset.reset", "smallest");
8585

@@ -95,17 +95,17 @@ public void testKafkaStream() throws InterruptedException {
9595
// Make sure you can get offset ranges from the rdd
9696
new Function<JavaPairRDD<String, String>, JavaPairRDD<String, String>>() {
9797
@Override
98-
public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) throws Exception {
98+
public JavaPairRDD<String, String> call(JavaPairRDD<String, String> rdd) {
9999
OffsetRange[] offsets = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
100100
offsetRanges.set(offsets);
101-
Assert.assertEquals(offsets[0].topic(), topic1);
101+
Assert.assertEquals(topic1, offsets[0].topic());
102102
return rdd;
103103
}
104104
}
105105
).map(
106106
new Function<Tuple2<String, String>, String>() {
107107
@Override
108-
public String call(Tuple2<String, String> kv) throws Exception {
108+
public String call(Tuple2<String, String> kv) {
109109
return kv._2();
110110
}
111111
}
@@ -119,10 +119,10 @@ public String call(Tuple2<String, String> kv) throws Exception {
119119
StringDecoder.class,
120120
String.class,
121121
kafkaParams,
122-
topicOffsetToMap(topic2, (long) 0),
122+
topicOffsetToMap(topic2, 0L),
123123
new Function<MessageAndMetadata<String, String>, String>() {
124124
@Override
125-
public String call(MessageAndMetadata<String, String> msgAndMd) throws Exception {
125+
public String call(MessageAndMetadata<String, String> msgAndMd) {
126126
return msgAndMd.message();
127127
}
128128
}
@@ -133,7 +133,7 @@ public String call(MessageAndMetadata<String, String> msgAndMd) throws Exception
133133
unifiedStream.foreachRDD(
134134
new Function<JavaRDD<String>, Void>() {
135135
@Override
136-
public Void call(JavaRDD<String> rdd) throws Exception {
136+
public Void call(JavaRDD<String> rdd) {
137137
result.addAll(rdd.collect());
138138
for (OffsetRange o : offsetRanges.get()) {
139139
System.out.println(
@@ -155,14 +155,14 @@ public Void call(JavaRDD<String> rdd) throws Exception {
155155
ssc.stop();
156156
}
157157

158-
private HashSet<String> topicToSet(String topic) {
159-
HashSet<String> topicSet = new HashSet<String>();
158+
private static Set<String> topicToSet(String topic) {
159+
Set<String> topicSet = new HashSet<>();
160160
topicSet.add(topic);
161161
return topicSet;
162162
}
163163

164-
private HashMap<TopicAndPartition, Long> topicOffsetToMap(String topic, Long offsetToStart) {
165-
HashMap<TopicAndPartition, Long> topicMap = new HashMap<TopicAndPartition, Long>();
164+
private static Map<TopicAndPartition, Long> topicOffsetToMap(String topic, Long offsetToStart) {
165+
Map<TopicAndPartition, Long> topicMap = new HashMap<>();
166166
topicMap.put(new TopicAndPartition(topic, 0), offsetToStart);
167167
return topicMap;
168168
}

external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaRDDSuite.java

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import java.io.Serializable;
2121
import java.util.HashMap;
22+
import java.util.Map;
2223

2324
import scala.Tuple2;
2425

@@ -66,19 +67,19 @@ public void testKafkaRDD() throws InterruptedException {
6667
String topic1 = "topic1";
6768
String topic2 = "topic2";
6869

69-
String[] topic1data = createTopicAndSendData(topic1);
70-
String[] topic2data = createTopicAndSendData(topic2);
70+
createTopicAndSendData(topic1);
71+
createTopicAndSendData(topic2);
7172

72-
HashMap<String, String> kafkaParams = new HashMap<String, String>();
73+
Map<String, String> kafkaParams = new HashMap<>();
7374
kafkaParams.put("metadata.broker.list", kafkaTestUtils.brokerAddress());
7475

7576
OffsetRange[] offsetRanges = {
7677
OffsetRange.create(topic1, 0, 0, 1),
7778
OffsetRange.create(topic2, 0, 0, 1)
7879
};
7980

80-
HashMap<TopicAndPartition, Broker> emptyLeaders = new HashMap<TopicAndPartition, Broker>();
81-
HashMap<TopicAndPartition, Broker> leaders = new HashMap<TopicAndPartition, Broker>();
81+
Map<TopicAndPartition, Broker> emptyLeaders = new HashMap<>();
82+
Map<TopicAndPartition, Broker> leaders = new HashMap<>();
8283
String[] hostAndPort = kafkaTestUtils.brokerAddress().split(":");
8384
Broker broker = Broker.create(hostAndPort[0], Integer.parseInt(hostAndPort[1]));
8485
leaders.put(new TopicAndPartition(topic1, 0), broker);
@@ -95,7 +96,7 @@ public void testKafkaRDD() throws InterruptedException {
9596
).map(
9697
new Function<Tuple2<String, String>, String>() {
9798
@Override
98-
public String call(Tuple2<String, String> kv) throws Exception {
99+
public String call(Tuple2<String, String> kv) {
99100
return kv._2();
100101
}
101102
}
@@ -113,7 +114,7 @@ public String call(Tuple2<String, String> kv) throws Exception {
113114
emptyLeaders,
114115
new Function<MessageAndMetadata<String, String>, String>() {
115116
@Override
116-
public String call(MessageAndMetadata<String, String> msgAndMd) throws Exception {
117+
public String call(MessageAndMetadata<String, String> msgAndMd) {
117118
return msgAndMd.message();
118119
}
119120
}
@@ -131,7 +132,7 @@ public String call(MessageAndMetadata<String, String> msgAndMd) throws Exception
131132
leaders,
132133
new Function<MessageAndMetadata<String, String>, String>() {
133134
@Override
134-
public String call(MessageAndMetadata<String, String> msgAndMd) throws Exception {
135+
public String call(MessageAndMetadata<String, String> msgAndMd) {
135136
return msgAndMd.message();
136137
}
137138
}

external/kafka/src/test/java/org/apache/spark/streaming/kafka/JavaKafkaStreamSuite.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -67,18 +67,18 @@ public void tearDown() {
6767
@Test
6868
public void testKafkaStream() throws InterruptedException {
6969
String topic = "topic1";
70-
HashMap<String, Integer> topics = new HashMap<String, Integer>();
70+
Map<String, Integer> topics = new HashMap<>();
7171
topics.put(topic, 1);
7272

73-
HashMap<String, Integer> sent = new HashMap<String, Integer>();
73+
Map<String, Integer> sent = new HashMap<>();
7474
sent.put("a", 5);
7575
sent.put("b", 3);
7676
sent.put("c", 10);
7777

7878
kafkaTestUtils.createTopic(topic);
7979
kafkaTestUtils.sendMessages(topic, sent);
8080

81-
HashMap<String, String> kafkaParams = new HashMap<String, String>();
81+
Map<String, String> kafkaParams = new HashMap<>();
8282
kafkaParams.put("zookeeper.connect", kafkaTestUtils.zkAddress());
8383
kafkaParams.put("group.id", "test-consumer-" + random.nextInt(10000));
8484
kafkaParams.put("auto.offset.reset", "smallest");
@@ -97,7 +97,7 @@ public void testKafkaStream() throws InterruptedException {
9797
JavaDStream<String> words = stream.map(
9898
new Function<Tuple2<String, String>, String>() {
9999
@Override
100-
public String call(Tuple2<String, String> tuple2) throws Exception {
100+
public String call(Tuple2<String, String> tuple2) {
101101
return tuple2._2();
102102
}
103103
}
@@ -106,7 +106,7 @@ public String call(Tuple2<String, String> tuple2) throws Exception {
106106
words.countByValue().foreachRDD(
107107
new Function<JavaPairRDD<String, Long>, Void>() {
108108
@Override
109-
public Void call(JavaPairRDD<String, Long> rdd) throws Exception {
109+
public Void call(JavaPairRDD<String, Long> rdd) {
110110
List<Tuple2<String, Long>> ret = rdd.collect();
111111
for (Tuple2<String, Long> r : ret) {
112112
if (result.containsKey(r._1())) {
@@ -130,8 +130,8 @@ public Void call(JavaPairRDD<String, Long> rdd) throws Exception {
130130
Thread.sleep(200);
131131
}
132132
Assert.assertEquals(sent.size(), result.size());
133-
for (String k : sent.keySet()) {
134-
Assert.assertEquals(sent.get(k).intValue(), result.get(k).intValue());
133+
for (Map.Entry<String, Integer> e : sent.entrySet()) {
134+
Assert.assertEquals(e.getValue().intValue(), result.get(e.getKey()).intValue());
135135
}
136136
}
137137
}

external/twitter/src/test/java/org/apache/spark/streaming/twitter/JavaTwitterStreamSuite.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,6 @@
1717

1818
package org.apache.spark.streaming.twitter;
1919

20-
import java.util.Arrays;
21-
2220
import org.junit.Test;
2321
import twitter4j.Status;
2422
import twitter4j.auth.Authorization;
@@ -30,7 +28,7 @@
3028
public class JavaTwitterStreamSuite extends LocalJavaStreamingContext {
3129
@Test
3230
public void testTwitterStream() {
33-
String[] filters = (String[])Arrays.<String>asList("filter1", "filter2").toArray();
31+
String[] filters = { "filter1", "filter2" };
3432
Authorization auth = NullAuthorization.getInstance();
3533

3634
// tests the API, does not actually test data receiving

extras/java8-tests/src/test/java/org/apache/spark/Java8APISuite.java

Lines changed: 25 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ public void call(String s) {
7777
public void foreach() {
7878
foreachCalls = 0;
7979
JavaRDD<String> rdd = sc.parallelize(Arrays.asList("Hello", "World"));
80-
rdd.foreach((x) -> foreachCalls++);
80+
rdd.foreach(x -> foreachCalls++);
8181
Assert.assertEquals(2, foreachCalls);
8282
}
8383

@@ -180,7 +180,7 @@ public void map() {
180180
JavaPairRDD<Integer, Integer> pairs = rdd.mapToPair(x -> new Tuple2<>(x, x))
181181
.cache();
182182
pairs.collect();
183-
JavaRDD<String> strings = rdd.map(x -> x.toString()).cache();
183+
JavaRDD<String> strings = rdd.map(Object::toString).cache();
184184
strings.collect();
185185
}
186186

@@ -195,7 +195,9 @@ public void flatMap() {
195195

196196
JavaPairRDD<String, String> pairs = rdd.flatMapToPair(s -> {
197197
List<Tuple2<String, String>> pairs2 = new LinkedList<>();
198-
for (String word : s.split(" ")) pairs2.add(new Tuple2<>(word, word));
198+
for (String word : s.split(" ")) {
199+
pairs2.add(new Tuple2<>(word, word));
200+
}
199201
return pairs2;
200202
});
201203

@@ -204,11 +206,12 @@ public void flatMap() {
204206

205207
JavaDoubleRDD doubles = rdd.flatMapToDouble(s -> {
206208
List<Double> lengths = new LinkedList<>();
207-
for (String word : s.split(" ")) lengths.add(word.length() * 1.0);
209+
for (String word : s.split(" ")) {
210+
lengths.add((double) word.length());
211+
}
208212
return lengths;
209213
});
210214

211-
Double x = doubles.first();
212215
Assert.assertEquals(5.0, doubles.first(), 0.01);
213216
Assert.assertEquals(11, pairs.count());
214217
}
@@ -228,7 +231,7 @@ public void mapsFromPairsToPairs() {
228231
swapped.collect();
229232

230233
// There was never a bug here, but it's worth testing:
231-
pairRDD.map(item -> item.swap()).collect();
234+
pairRDD.map(Tuple2::swap).collect();
232235
}
233236

234237
@Test
@@ -282,11 +285,11 @@ public void zipPartitions() {
282285
FlatMapFunction2<Iterator<Integer>, Iterator<String>, Integer> sizesFn =
283286
(Iterator<Integer> i, Iterator<String> s) -> {
284287
int sizeI = 0;
285-
int sizeS = 0;
286288
while (i.hasNext()) {
287289
sizeI += 1;
288290
i.next();
289291
}
292+
int sizeS = 0;
290293
while (s.hasNext()) {
291294
sizeS += 1;
292295
s.next();
@@ -301,30 +304,31 @@ public void zipPartitions() {
301304
public void accumulators() {
302305
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5));
303306

304-
final Accumulator<Integer> intAccum = sc.intAccumulator(10);
305-
rdd.foreach(x -> intAccum.add(x));
307+
Accumulator<Integer> intAccum = sc.intAccumulator(10);
308+
rdd.foreach(intAccum::add);
306309
Assert.assertEquals((Integer) 25, intAccum.value());
307310

308-
final Accumulator<Double> doubleAccum = sc.doubleAccumulator(10.0);
311+
Accumulator<Double> doubleAccum = sc.doubleAccumulator(10.0);
309312
rdd.foreach(x -> doubleAccum.add((double) x));
310313
Assert.assertEquals((Double) 25.0, doubleAccum.value());
311314

312315
// Try a custom accumulator type
313316
AccumulatorParam<Float> floatAccumulatorParam = new AccumulatorParam<Float>() {
317+
@Override
314318
public Float addInPlace(Float r, Float t) {
315319
return r + t;
316320
}
317-
321+
@Override
318322
public Float addAccumulator(Float r, Float t) {
319323
return r + t;
320324
}
321-
325+
@Override
322326
public Float zero(Float initialValue) {
323327
return 0.0f;
324328
}
325329
};
326330

327-
final Accumulator<Float> floatAccum = sc.accumulator(10.0f, floatAccumulatorParam);
331+
Accumulator<Float> floatAccum = sc.accumulator(10.0f, floatAccumulatorParam);
328332
rdd.foreach(x -> floatAccum.add((float) x));
329333
Assert.assertEquals((Float) 25.0f, floatAccum.value());
330334

@@ -336,7 +340,7 @@ public Float zero(Float initialValue) {
336340
@Test
337341
public void keyBy() {
338342
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1, 2));
339-
List<Tuple2<String, Integer>> s = rdd.keyBy(x -> x.toString()).collect();
343+
List<Tuple2<String, Integer>> s = rdd.keyBy(Object::toString).collect();
340344
Assert.assertEquals(new Tuple2<>("1", 1), s.get(0));
341345
Assert.assertEquals(new Tuple2<>("2", 2), s.get(1));
342346
}
@@ -349,7 +353,7 @@ public void mapOnPairRDD() {
349353
JavaPairRDD<Integer, Integer> rdd3 =
350354
rdd2.mapToPair(in -> new Tuple2<>(in._2(), in._1()));
351355
Assert.assertEquals(Arrays.asList(
352-
new Tuple2<Integer, Integer>(1, 1),
356+
new Tuple2<>(1, 1),
353357
new Tuple2<>(0, 2),
354358
new Tuple2<>(1, 3),
355359
new Tuple2<>(0, 4)), rdd3.collect());
@@ -361,7 +365,7 @@ public void collectPartitions() {
361365

362366
JavaPairRDD<Integer, Integer> rdd2 =
363367
rdd1.mapToPair(i -> new Tuple2<>(i, i % 2));
364-
List[] parts = rdd1.collectPartitions(new int[]{0});
368+
List<Integer>[] parts = rdd1.collectPartitions(new int[]{0});
365369
Assert.assertEquals(Arrays.asList(1, 2), parts[0]);
366370

367371
parts = rdd1.collectPartitions(new int[]{1, 2});
@@ -371,19 +375,19 @@ public void collectPartitions() {
371375
Assert.assertEquals(Arrays.asList(new Tuple2<>(1, 1), new Tuple2<>(2, 0)),
372376
rdd2.collectPartitions(new int[]{0})[0]);
373377

374-
parts = rdd2.collectPartitions(new int[]{1, 2});
375-
Assert.assertEquals(Arrays.asList(new Tuple2<>(3, 1), new Tuple2<>(4, 0)), parts[0]);
378+
List<Tuple2<Integer, Integer>>[] parts2 = rdd2.collectPartitions(new int[]{1, 2});
379+
Assert.assertEquals(Arrays.asList(new Tuple2<>(3, 1), new Tuple2<>(4, 0)), parts2[0]);
376380
Assert.assertEquals(Arrays.asList(new Tuple2<>(5, 1), new Tuple2<>(6, 0), new Tuple2<>(7, 1)),
377-
parts[1]);
381+
parts2[1]);
378382
}
379383

380384
@Test
381385
public void collectAsMapWithIntArrayValues() {
382386
// Regression test for SPARK-1040
383-
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(new Integer[]{1}));
387+
JavaRDD<Integer> rdd = sc.parallelize(Arrays.asList(1));
384388
JavaPairRDD<Integer, int[]> pairRDD =
385389
rdd.mapToPair(x -> new Tuple2<>(x, new int[]{x}));
386390
pairRDD.collect(); // Works fine
387-
Map<Integer, int[]> map = pairRDD.collectAsMap(); // Used to crash with ClassCastException
391+
pairRDD.collectAsMap(); // Used to crash with ClassCastException
388392
}
389393
}

0 commit comments

Comments
 (0)