Skip to content

Commit 120e2a1

Browse files
SPARK-1729. Some test changes and changes to utils classes.
1 parent 9fd0da7 commit 120e2a1

File tree

6 files changed

+175
-61
lines changed

6 files changed

+175
-61
lines changed

external/flume-sink/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,11 @@
6464
</exclusion>
6565
</exclusions>
6666
</dependency>
67+
<dependency>
68+
<groupId>org.scala-lang</groupId>
69+
<artifactId>scala-library</artifactId>
70+
<version>2.10.4</version>
71+
</dependency>
6772
</dependencies>
6873
<build>
6974
<outputDirectory>target/scala-${scala.binary.version}/classes</outputDirectory>

external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkAvroCallbackHandler.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,10 +64,9 @@ private class SparkAvroCallbackHandler(val threads: Int, val channel: Channel,
6464
})
6565
// Wait until a batch is available - will be an error if error message is non-empty
6666
val batch = processor.getEventBatch
67-
if (batch.getErrorMsg != null && !batch.getErrorMsg.equals("")) {
68-
processorMap.put(sequenceNumber, processor)
67+
if (!SparkSinkUtils.isErrorBatch(batch)) {
68+
processorMap.put(sequenceNumber.toString, processor)
6969
}
70-
7170
batch
7271
}
7372

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.flume.sink
18+
19+
import org.apache.spark.flume.EventBatch
20+
21+
object SparkSinkUtils {
22+
/**
23+
* This method determines if this batch represents an error or not.
24+
* @param batch - The batch to check
25+
* @return - true if the batch represents an error
26+
*/
27+
def isErrorBatch(batch: EventBatch): Boolean = {
28+
!batch.getErrorMsg.toString.equals("") //If there is an error message, it is an error batch.
29+
}
30+
}

external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala

Lines changed: 46 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ import java.net.InetSocketAddress
2222
import java.nio.ByteBuffer
2323
import java.util.concurrent.{TimeUnit, Executors}
2424

25+
import org.apache.spark.flume.sink.SparkSinkUtils
26+
2527
import scala.collection.JavaConversions._
2628
import scala.reflect.ClassTag
2729

@@ -91,52 +93,54 @@ private[streaming] class FlumePollingReceiver(
9193
new FlumeConnection(transceiver, client)
9294
}).toArray
9395

94-
// Threads that pull data from Flume.
95-
val dataReceiver = new Runnable {
96-
override def run(): Unit = {
97-
var counter = 0
98-
while (true) {
99-
counter = counter % connections.size
100-
val client = connections(counter).client
101-
counter += 1
102-
val eventBatch = client.getEventBatch(maxBatchSize)
103-
val errorMsg = eventBatch.getErrorMsg
104-
if (errorMsg.toString.equals("")) { // No error, proceed with processing data
105-
val seq = eventBatch.getSequenceNumber
106-
val events: java.util.List[SparkSinkEvent] = eventBatch.getEvents
107-
logDebug(
108-
"Received batch of " + events.size() + " events with sequence number: " + seq)
109-
try {
110-
// Convert each Flume event to a serializable SparkPollingEvent
111-
events.foreach(event => {
112-
store(SparkFlumePollingEvent.fromSparkSinkEvent(event))
113-
})
114-
// Send an ack to Flume so that Flume discards the events from its channels.
115-
client.ack(seq)
116-
} catch {
117-
case e: Exception =>
118-
try {
119-
// Let Flume know that the events need to be pushed back into the channel.
120-
client.nack(seq) // If the agent is down, even this could fail and throw
121-
} catch {
122-
case e: Exception => logError(
123-
"Sending Nack also failed. A Flume agent is down.")
96+
for (i <- 0 until parallelism) {
97+
logInfo("Starting Flume Polling Receiver worker threads starting..")
98+
// Threads that pull data from Flume.
99+
receiverExecutor.submit(new Runnable {
100+
override def run(): Unit = {
101+
var counter = i
102+
while (true) {
103+
counter = counter % (connections.length)
104+
val client = connections(counter).client
105+
counter += 1
106+
val eventBatch = client.getEventBatch(maxBatchSize)
107+
if (!SparkSinkUtils.isErrorBatch(eventBatch)) {
108+
// No error, proceed with processing data
109+
val seq = eventBatch.getSequenceNumber
110+
val events: java.util.List[SparkSinkEvent] = eventBatch.getEvents
111+
logDebug(
112+
"Received batch of " + events.size() + " events with sequence number: " + seq)
113+
try {
114+
// Convert each Flume event to a serializable SparkPollingEvent
115+
var j = 0
116+
while (j < events.size()) {
117+
store(SparkFlumePollingEvent.fromSparkSinkEvent(events(j)))
118+
logDebug("Stored events with seq:" + seq)
119+
j += 1
124120
}
125-
TimeUnit.SECONDS.sleep(2L) // for now just leave this as a fixed 2 seconds.
126-
logWarning("Error while attempting to store events", e)
121+
logInfo("Sending ack for: " +seq)
122+
// Send an ack to Flume so that Flume discards the events from its channels.
123+
client.ack(seq)
124+
logDebug("Ack sent for sequence number: " + seq)
125+
} catch {
126+
case e: Exception =>
127+
try {
128+
// Let Flume know that the events need to be pushed back into the channel.
129+
client.nack(seq) // If the agent is down, even this could fail and throw
130+
} catch {
131+
case e: Exception => logError(
132+
"Sending Nack also failed. A Flume agent is down.")
133+
}
134+
TimeUnit.SECONDS.sleep(2L) // for now just leave this as a fixed 2 seconds.
135+
logWarning("Error while attempting to store events", e)
136+
}
137+
} else {
138+
logWarning("Did not receive events from Flume agent due to error on the Flume " +
139+
"agent: " + eventBatch.getErrorMsg)
127140
}
128-
} else {
129-
logWarning("Did not receive events from Flume agent due to error on the Flume agent: " +
130-
"" + errorMsg.toString)
131141
}
132142
}
133-
}
134-
}
135-
136-
// Create multiple threads and start all of them.
137-
for (i <- 0 until parallelism) {
138-
logInfo("Starting Flume Polling Receiver worker threads starting..")
139-
receiverExecutor.submit(dataReceiver)
143+
})
140144
}
141145
}
142146

external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala

Lines changed: 77 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ object FlumeUtils {
5959
): ReceiverInputDStream[SparkFlumeEvent] = {
6060
val inputStream = new FlumeInputDStream[SparkFlumeEvent](
6161
ssc, hostname, port, storageLevel, enableDecompression)
62-
62+
6363
inputStream
6464
}
6565

@@ -109,6 +109,39 @@ object FlumeUtils {
109109
createStream(jssc.ssc, hostname, port, storageLevel, enableDecompression)
110110
}
111111

112+
/**
113+
* Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
114+
* This stream will poll the sink for data and will pull events as they are available.
115+
* This stream will use a batch size of 100 events and run 5 threads to pull data.
116+
* @param host The address of the host on which the Spark Sink is running
117+
* @param port The port that the host is listening on
118+
* @param storageLevel Storage level to use for storing the received objects
119+
*/
120+
def createPollingStream(
121+
ssc: StreamingContext,
122+
host: String,
123+
port: Int,
124+
storageLevel: StorageLevel
125+
): ReceiverInputDStream[SparkFlumePollingEvent] = {
126+
new FlumePollingInputDStream[SparkFlumePollingEvent](ssc,
127+
Seq(new InetSocketAddress(host, port)), 100, 5, storageLevel)
128+
}
129+
130+
/**
131+
* Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
132+
* This stream will poll the sink for data and will pull events as they are available.
133+
* This stream will use a batch size of 100 events and run 5 threads to pull data.
134+
* @param addresses List of InetSocketAddresses representing the hosts to connect to.
135+
* @param storageLevel Storage level to use for storing the received objects
136+
*/
137+
def createPollingStream (
138+
ssc: StreamingContext,
139+
addresses: Seq[InetSocketAddress],
140+
storageLevel: StorageLevel
141+
): ReceiverInputDStream[SparkFlumePollingEvent] = {
142+
new FlumePollingInputDStream[SparkFlumePollingEvent](ssc, addresses, 100, 5, storageLevel)
143+
}
144+
112145
/**
113146
* Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
114147
* This stream will poll the sink for data and will pull events as they are available.
@@ -123,14 +156,48 @@ object FlumeUtils {
123156
def createPollingStream (
124157
ssc: StreamingContext,
125158
addresses: Seq[InetSocketAddress],
126-
maxBatchSize: Int = 100,
127-
parallelism: Int = 5,
128-
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
159+
maxBatchSize: Int,
160+
parallelism: Int,
161+
storageLevel: StorageLevel
129162
): ReceiverInputDStream[SparkFlumePollingEvent] = {
130163
new FlumePollingInputDStream[SparkFlumePollingEvent](ssc, addresses, maxBatchSize,
131164
parallelism, storageLevel)
132165
}
133166

167+
/**
168+
* Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
169+
* This stream will poll the sink for data and will pull events as they are available.
170+
* This stream will use a batch size of 100 events and run 5 threads to pull data.
171+
* @param addresses List of InetSocketAddresses representing the hosts to connect to.
172+
* @param storageLevel Storage level to use for storing the received objects
173+
*/
174+
def createPollingStream (
175+
jssc: JavaStreamingContext,
176+
addresses: Seq[InetSocketAddress],
177+
storageLevel: StorageLevel
178+
): ReceiverInputDStream[SparkFlumePollingEvent] = {
179+
new FlumePollingInputDStream[SparkFlumePollingEvent](jssc.ssc, addresses, 100, 5,
180+
StorageLevel.MEMORY_AND_DISK_SER_2)
181+
}
182+
183+
/**
184+
* Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
185+
* This stream will poll the sink for data and will pull events as they are available.
186+
* This stream will use a batch size of 100 events and run 5 threads to pull data.
187+
* @param host The address of the host on which the Spark Sink is running
188+
* @param port The port that the host is listening on
189+
* @param storageLevel Storage level to use for storing the received objects
190+
*/
191+
def createPollingStream(
192+
jssc: JavaStreamingContext,
193+
host: String,
194+
port: Int,
195+
storageLevel: StorageLevel
196+
): ReceiverInputDStream[SparkFlumePollingEvent] = {
197+
new FlumePollingInputDStream[SparkFlumePollingEvent](jssc.ssc,
198+
Seq(new InetSocketAddress(host, port)), 100, 5, storageLevel)
199+
}
200+
134201
/**
135202
* Creates an input stream that is to be used with the Spark Sink deployed on a Flume agent.
136203
* This stream will poll the sink for data and will pull events as they are available.
@@ -142,14 +209,14 @@ object FlumeUtils {
142209
* result in this stream using more threads
143210
* @param storageLevel Storage level to use for storing the received objects
144211
*/
145-
def createJavaPollingStream (
146-
ssc: StreamingContext,
212+
def createPollingStream (
213+
jssc: JavaStreamingContext,
147214
addresses: Seq[InetSocketAddress],
148-
maxBatchSize: Int = 100,
149-
parallelism: Int = 5,
150-
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
215+
maxBatchSize: Int,
216+
parallelism: Int,
217+
storageLevel: StorageLevel
151218
): JavaReceiverInputDStream[SparkFlumePollingEvent] = {
152-
new FlumePollingInputDStream[SparkFlumePollingEvent](ssc, addresses, maxBatchSize,
219+
new FlumePollingInputDStream[SparkFlumePollingEvent](jssc.ssc, addresses, maxBatchSize,
153220
parallelism, storageLevel)
154221
}
155222
}

external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingReceiverSuite.scala

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,15 +35,15 @@ import org.apache.spark.streaming.dstream.ReceiverInputDStream
3535
import org.apache.spark.streaming.util.ManualClock
3636
import org.apache.spark.streaming.{TestSuiteBase, TestOutputStream, StreamingContext}
3737

38-
class FlumePollingReceiverSuite extends TestSuiteBase {
38+
class FlumePollingReceiverSuite extends TestSuiteBase {
3939

4040
val testPort = 9999
4141

4242
test("flume polling test") {
4343
// Set up the streaming context and input streams
4444
val ssc = new StreamingContext(conf, batchDuration)
4545
val flumeStream: ReceiverInputDStream[SparkFlumePollingEvent] =
46-
FlumeUtils.createPollingStream(ssc, Seq(new InetSocketAddress("localhost", testPort)), 100, 5,
46+
FlumeUtils.createPollingStream(ssc, Seq(new InetSocketAddress("localhost", testPort)), 100, 1,
4747
StorageLevel.MEMORY_AND_DISK)
4848
val outputBuffer = new ArrayBuffer[Seq[SparkFlumePollingEvent]]
4949
with SynchronizedBuffer[Seq[SparkFlumePollingEvent]]
@@ -66,6 +66,7 @@ class FlumePollingReceiverSuite extends TestSuiteBase {
6666
sink.start()
6767
ssc.start()
6868
writeAndVerify(Seq(channel), ssc, outputBuffer)
69+
assertQueuesAreEmpty(channel)
6970
sink.stop()
7071
channel.stop()
7172
}
@@ -75,7 +76,7 @@ class FlumePollingReceiverSuite extends TestSuiteBase {
7576
val ssc = new StreamingContext(conf, batchDuration)
7677
val flumeStream: ReceiverInputDStream[SparkFlumePollingEvent] =
7778
FlumeUtils.createPollingStream(ssc, Seq(new InetSocketAddress("localhost", testPort),
78-
new InetSocketAddress("localhost", testPort + 1)), 100, 5,
79+
new InetSocketAddress("localhost", testPort + 1)), 100, 2,
7980
StorageLevel.MEMORY_AND_DISK)
8081
val outputBuffer = new ArrayBuffer[Seq[SparkFlumePollingEvent]]
8182
with SynchronizedBuffer[Seq[SparkFlumePollingEvent]]
@@ -108,9 +109,10 @@ class FlumePollingReceiverSuite extends TestSuiteBase {
108109
sink2.start()
109110
ssc.start()
110111
writeAndVerify(Seq(channel, channel2), ssc, outputBuffer)
112+
assertQueuesAreEmpty(channel)
113+
assertQueuesAreEmpty(channel2)
111114
sink.stop()
112115
channel.stop()
113-
114116
}
115117

116118
def writeAndVerify(channels: Seq[MemoryChannel], ssc: StreamingContext,
@@ -126,12 +128,12 @@ class FlumePollingReceiverSuite extends TestSuiteBase {
126128
}
127129
val startTime = System.currentTimeMillis()
128130
while (outputBuffer.size < 5 * channels.size &&
129-
System.currentTimeMillis() - startTime < maxWaitTimeMillis) {
131+
System.currentTimeMillis() - startTime < 15000) {
130132
logInfo("output.size = " + outputBuffer.size)
131133
Thread.sleep(100)
132134
}
133135
val timeTaken = System.currentTimeMillis() - startTime
134-
assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms")
136+
assert(timeTaken < 15000, "Operation timed out after " + timeTaken + " ms")
135137
logInfo("Stopping context")
136138
ssc.stop()
137139

@@ -158,6 +160,13 @@ class FlumePollingReceiverSuite extends TestSuiteBase {
158160
assert(counter === 25 * channels.size)
159161
}
160162

163+
def assertQueuesAreEmpty(channel: MemoryChannel) = {
164+
val queueRemaining = channel.getClass.getDeclaredField("queueRemaining");
165+
queueRemaining.setAccessible(true)
166+
val m = queueRemaining.get(channel).getClass.getDeclaredMethod("availablePermits")
167+
assert(m.invoke(queueRemaining.get(channel)).asInstanceOf[Int] === 5000)
168+
}
169+
161170
private class TxnSubmitter(channel: MemoryChannel, clock: ManualClock) extends Callable[Void] {
162171
override def call(): Void = {
163172
var t = 0

0 commit comments

Comments
 (0)