Skip to content

Commit 0090553

Browse files
committed
[SPARK-4964] javafication of interfaces
1 parent 9a838c2 commit 0090553

File tree

4 files changed

+108
-86
lines changed

4 files changed

+108
-86
lines changed
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.streaming.kafka;
19+
20+
/** Something that has a collection of OffsetRanges */
21+
public interface HasOffsetRanges {
22+
/** array of OffsetRanges */
23+
public OffsetRange[] offsetRanges();
24+
}

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/Leader.scala renamed to external/kafka/src/main/java/org/apache/spark/streaming/kafka/Leader.java

Lines changed: 7 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -15,54 +15,19 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.spark.streaming.kafka
19-
20-
import kafka.common.TopicAndPartition
18+
package org.apache.spark.streaming.kafka;
2119

2220
/** Host info for the leader of a Kafka TopicAndPartition */
23-
24-
trait Leader {
25-
/** kafka topic name */
26-
def topic: String
21+
public interface Leader {
22+
/** kafka topic name */
23+
public String topic();
2724

2825
/** kafka partition id */
29-
def partition: Int
26+
public int partition();
3027

3128
/** kafka hostname */
32-
def host: String
29+
public String host();
3330

3431
/** kafka host's port */
35-
def port: Int
36-
}
37-
38-
private class LeaderImpl(
39-
override val topic: String,
40-
override val partition: Int,
41-
override val host: String,
42-
override val port: Int
43-
) extends Leader
44-
45-
object Leader {
46-
def create(
47-
topic: String,
48-
partition: Int,
49-
host: String,
50-
port: Int): Leader =
51-
new LeaderImpl(
52-
topic,
53-
partition,
54-
host,
55-
port)
56-
57-
def create(
58-
topicAndPartition: TopicAndPartition,
59-
host: String,
60-
port: Int): Leader =
61-
new LeaderImpl(
62-
topicAndPartition.topic,
63-
topicAndPartition.partition,
64-
host,
65-
port)
66-
32+
public int port();
6733
}
68-

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/OffsetRange.scala renamed to external/kafka/src/main/java/org/apache/spark/streaming/kafka/OffsetRange.java

Lines changed: 6 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -15,57 +15,19 @@
1515
* limitations under the License.
1616
*/
1717

18-
package org.apache.spark.streaming.kafka
19-
20-
import kafka.common.TopicAndPartition
18+
package org.apache.spark.streaming.kafka;
2119

2220
/** Represents a range of offsets from a single Kafka TopicAndPartition */
23-
trait OffsetRange {
21+
public interface OffsetRange {
2422
/** kafka topic name */
25-
def topic: String
23+
public String topic();
2624

2725
/** kafka partition id */
28-
def partition: Int
26+
public int partition();
2927

3028
/** inclusive starting offset */
31-
def fromOffset: Long
29+
public long fromOffset();
3230

3331
/** exclusive ending offset */
34-
def untilOffset: Long
35-
}
36-
37-
/** Something that has a collection of OffsetRanges */
38-
trait HasOffsetRanges {
39-
def offsetRanges: Array[OffsetRange]
40-
}
41-
42-
private class OffsetRangeImpl(
43-
override val topic: String,
44-
override val partition: Int,
45-
override val fromOffset: Long,
46-
override val untilOffset: Long
47-
) extends OffsetRange
48-
49-
object OffsetRange {
50-
def create(
51-
topic: String,
52-
partition: Int,
53-
fromOffset: Long,
54-
untilOffset: Long): OffsetRange =
55-
new OffsetRangeImpl(
56-
topic,
57-
partition,
58-
fromOffset,
59-
untilOffset)
60-
61-
def create(
62-
topicAndPartition: TopicAndPartition,
63-
fromOffset: Long,
64-
untilOffset: Long): OffsetRange =
65-
new OffsetRangeImpl(
66-
topicAndPartition.topic,
67-
topicAndPartition.partition,
68-
fromOffset,
69-
untilOffset)
70-
32+
public long untilOffset();
7133
}

external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -330,4 +330,75 @@ object KafkaUtils {
330330
ok => ok
331331
)
332332
}
333+
334+
private class OffsetRangeImpl(
335+
override val topic: String,
336+
override val partition: Int,
337+
override val fromOffset: Long,
338+
override val untilOffset: Long) extends OffsetRange
339+
340+
/**
341+
* Behaviorless container for a range of offsets from a single Kafka TopicAndPartition
342+
* @param topic kafka topic name
343+
* @param partition kafka partition id
344+
* @param fromOffset inclusive starting offset
345+
* @param untilOffset exclusive ending offset
346+
*/
347+
@Experimental
348+
def createOffsetRange(
349+
topic: String,
350+
partition: Int,
351+
fromOffset: Long,
352+
untilOffset: Long): OffsetRange =
353+
new OffsetRangeImpl(topic, partition, fromOffset, untilOffset)
354+
355+
/**
356+
* Behaviorless container for a range of offsets from a single Kafka TopicAndPartition
357+
* @param topicAndPartition kafka TopicAndPartition
358+
* @param fromOffset inclusive starting offset
359+
* @param untilOffset exclusive ending offset
360+
*/
361+
@Experimental
362+
def createOffsetRange(
363+
topicAndPartition: TopicAndPartition,
364+
fromOffset: Long,
365+
untilOffset: Long): OffsetRange =
366+
new OffsetRangeImpl(
367+
topicAndPartition.topic, topicAndPartition.partition, fromOffset, untilOffset)
368+
369+
private class LeaderImpl(
370+
override val topic: String,
371+
override val partition: Int,
372+
override val host: String,
373+
override val port: Int) extends Leader
374+
375+
/**
376+
* Behaviorless container of host info for the leader of a Kafka TopicAndPartition
377+
* @param topic kafka topic name
378+
* @param partition kafka partition id
379+
* @param host kafka hostname
380+
* @param port kafka host's port
381+
*/
382+
@Experimental
383+
def createLeader(topic: String, partition: Int, host: String, port: Int): Leader =
384+
new LeaderImpl(topic,partition,
385+
host,
386+
port)
387+
388+
/**
389+
* Behaviorless container of host info for the leader of a Kafka TopicAndPartition
390+
* @param topicAndPartition kafka TopicAndPartition
391+
* @param host kafka hostname
392+
* @param port kafka host's port
393+
*/
394+
@Experimental
395+
def createLeader(
396+
topicAndPartition: TopicAndPartition,
397+
host: String,
398+
port: Int): Leader =
399+
new LeaderImpl(
400+
topicAndPartition.topic,
401+
topicAndPartition.partition,
402+
host,
403+
port)
333404
}

0 commit comments

Comments
 (0)