Skip to content

Commit d24d9d4

Browse files
SPARK-1729. Make Flume pull data from source, rather than the current push model
Update to the previous patch fixing some error cases and also excluding Netty dependencies. Also updated the unit tests.
1 parent 6d6776a commit d24d9d4

File tree

3 files changed

+78
-40
lines changed

3 files changed

+78
-40
lines changed

external/flume-sink/src/main/scala/org/apache/spark/flume/sink/SparkSink.scala

Lines changed: 50 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -30,9 +30,8 @@ import java.util.concurrent._
3030
import java.util
3131
import org.apache.flume.conf.{ConfigurationException, Configurable}
3232
import com.google.common.util.concurrent.ThreadFactoryBuilder
33-
import org.apache.avro.ipc.{NettyTransceiver, NettyServer}
33+
import org.apache.avro.ipc.NettyServer
3434
import org.apache.avro.ipc.specific.SpecificResponder
35-
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
3635
import java.net.InetSocketAddress
3736

3837
class SparkSink() extends AbstractSink with Configurable {
@@ -75,12 +74,10 @@ class SparkSink() extends AbstractSink with Configurable {
7574

7675
val responder = new SpecificResponder(classOf[SparkFlumeProtocol], new AvroCallbackHandler())
7776

78-
serverOpt = Option(new NettyServer(responder, new InetSocketAddress(hostname, port),
79-
new NioServerSocketChannelFactory(
80-
Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat(
81-
"Spark Sink " + classOf[NettyTransceiver].getSimpleName + " Boss-%d").build),
82-
Executors.newFixedThreadPool(maxThreads, new ThreadFactoryBuilder().setNameFormat(
83-
"Spark Sink " + classOf[NettyTransceiver].getSimpleName + " I/O Worker-%d").build))))
77+
// Using the constructor that takes specific thread-pools requires bringing in netty
78+
// dependencies which are being excluded in the build. In practice,
79+
// Netty dependencies are already available on the JVM as Flume would have pulled them in.
80+
serverOpt = Option(new NettyServer(responder, new InetSocketAddress(hostname, port)))
8481

8582
serverOpt.map(server => server.start())
8683
lock.lock()
@@ -93,10 +90,14 @@ class SparkSink() extends AbstractSink with Configurable {
9390
}
9491

9592
override def stop() {
93+
transactionExecutorOpt.map(executor => executor.shutdownNow())
94+
serverOpt.map(server => {
95+
server.close()
96+
server.join()
97+
})
9698
lock.lock()
9799
try {
98100
running = false
99-
transactionExecutorOpt.map(executor => executor.shutdownNow())
100101
blockingCondition.signalAll()
101102
} finally {
102103
lock.unlock()
@@ -131,23 +132,28 @@ class SparkSink() extends AbstractSink with Configurable {
131132
Status.BACKOFF
132133
}
133134

135+
136+
// Object representing an empty batch returned by the txn processor due to some error.
137+
case object ErrorEventBatch extends EventBatch
138+
134139
private class AvroCallbackHandler() extends SparkFlumeProtocol {
135140

136141
override def getEventBatch(n: Int): EventBatch = {
137142
val processor = processorFactory.get.checkOut(n)
138143
transactionExecutorOpt.map(executor => executor.submit(processor))
139144
// Wait until a batch is available - can be null if some error was thrown
140-
val eventBatch = Option(processor.eventQueue.take())
141-
if (eventBatch.isDefined) {
142-
val eventsToBeSent = eventBatch.get
143-
processorMap.put(eventsToBeSent.getSequenceNumber, processor)
144-
if (LOG.isDebugEnabled) {
145-
LOG.debug("Sent " + eventsToBeSent.getEventBatch.size() +
146-
" events with sequence number: " + eventsToBeSent.getSequenceNumber)
145+
val eventBatch = processor.eventQueue.take()
146+
eventBatch match {
147+
case ErrorEventBatch => throw new FlumeException("Something went wrong. No events" +
148+
" retrieved from channel.")
149+
case events => {
150+
processorMap.put(events.getSequenceNumber, processor)
151+
if (LOG.isDebugEnabled) {
152+
LOG.debug("Sent " + events.getEventBatch.size() +
153+
" events with sequence number: " + events.getSequenceNumber)
154+
}
155+
events
147156
}
148-
eventsToBeSent
149-
} else {
150-
throw new FlumeException("Error while trying to retrieve events from the channel.")
151157
}
152158
}
153159

@@ -211,17 +217,38 @@ class SparkSink() extends AbstractSink with Configurable {
211217
val events = eventBatch.getEventBatch
212218
events.clear()
213219
val loop = new Breaks
220+
var gotEventsInThisTxn = false
214221
loop.breakable {
215-
for (i <- 0 until maxBatchSize) {
222+
var i = 0
223+
// Using for here causes the maxBatchSize change to be ineffective as the Range gets
224+
// pregenerated
225+
while (i < maxBatchSize) {
226+
i += 1
216227
val eventOpt = Option(getChannel.take())
217-
218228
eventOpt.map(event => {
219229
events.add(new SparkSinkEvent(toCharSequenceMap(event
220230
.getHeaders),
221231
ByteBuffer.wrap(event.getBody)))
232+
gotEventsInThisTxn = true
222233
})
223234
if (eventOpt.isEmpty) {
224-
loop.break()
235+
if (!gotEventsInThisTxn) {
236+
// To avoid sending empty batches, we wait till events are available backing off
237+
// between attempts to get events. Each attempt to get an event though causes one
238+
// iteration to be lost. To ensure that we still send back maxBatchSize number of
239+
// events, we cheat and increase the maxBatchSize by 1 to account for the lost
240+
// iteration. Even throwing an exception is expensive as Avro will serialize it
241+
// and send it over the wire, which is useless. Before incrementing though,
242+
// ensure that we are not anywhere near INT_MAX.
243+
if (maxBatchSize >= Int.MaxValue / 2) {
244+
// Random sanity check
245+
throw new RuntimeException("Safety exception - polled too many times, no events!")
246+
}
247+
maxBatchSize += 1
248+
Thread.sleep(500)
249+
} else {
250+
loop.break()
251+
}
225252
}
226253
}
227254
}
@@ -283,7 +310,7 @@ class SparkSink() extends AbstractSink with Configurable {
283310
null // No point rethrowing the exception
284311
} finally {
285312
// Must *always* release the caller thread
286-
eventQueue.put(null)
313+
eventQueue.put(ErrorEventBatch)
287314
// In the case of success coming after the timeout, but before resetting the seq number
288315
// remove the event from the map and then clear the value
289316
resultQueue.clear()

external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingReceiverSuite.scala

Lines changed: 28 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import org.apache.spark.streaming.{TestSuiteBase, TestOutputStream, StreamingCon
2222
import org.apache.spark.storage.StorageLevel
2323
import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer}
2424
import org.apache.spark.streaming.util.ManualClock
25-
import java.nio.charset.Charset
2625
import org.apache.flume.channel.MemoryChannel
2726
import org.apache.flume.Context
2827
import org.apache.flume.conf.Configurables
@@ -39,7 +38,7 @@ class FlumePollingReceiverSuite extends TestSuiteBase {
3938
// Set up the streaming context and input streams
4039
val ssc = new StreamingContext(conf, batchDuration)
4140
val flumeStream: ReceiverInputDStream[SparkPollingEvent] =
42-
FlumeUtils.createPollingStream(ssc, "localhost", testPort, 100, 5,
41+
FlumeUtils.createPollingStream(ssc, "localhost", testPort, 100, 1,
4342
StorageLevel.MEMORY_AND_DISK)
4443
val outputBuffer = new ArrayBuffer[Seq[SparkPollingEvent]]
4544
with SynchronizedBuffer[Seq[SparkPollingEvent]]
@@ -63,15 +62,17 @@ class FlumePollingReceiverSuite extends TestSuiteBase {
6362
ssc.start()
6463

6564
val clock = ssc.scheduler.clock.asInstanceOf[ManualClock]
66-
val input = Seq(1, 2, 3, 4, 5)
65+
var t = 0
6766
for (i <- 0 until 5) {
6867
val tx = channel.getTransaction
6968
tx.begin()
70-
for (j <- 0 until input.size) {
69+
for (j <- 0 until 5) {
7170
channel.put(EventBuilder.withBody(
72-
(String.valueOf(i) + input(j)).getBytes("utf-8"),
73-
Map[String, String]("test-" + input(j).toString -> "header")))
71+
String.valueOf(t).getBytes("utf-8"),
72+
Map[String, String]("test-" + t.toString -> "header")))
73+
t += 1
7474
}
75+
7576
tx.commit()
7677
tx.close()
7778
Thread.sleep(500) // Allow some time for the events to reach
@@ -86,19 +87,30 @@ class FlumePollingReceiverSuite extends TestSuiteBase {
8687
assert(timeTaken < maxWaitTimeMillis, "Operation timed out after " + timeTaken + " ms")
8788
logInfo("Stopping context")
8889
ssc.stop()
90+
sink.stop()
91+
channel.stop()
8992

90-
val decoder = Charset.forName("UTF-8").newDecoder()
91-
92-
assert(outputBuffer.size === 5)
93+
val flattenedBuffer = outputBuffer.flatten
94+
assert(flattenedBuffer.size === 25)
9395
var counter = 0
94-
for (i <- 0 until outputBuffer.size;
95-
j <- 0 until outputBuffer(i).size) {
96-
counter += 1
97-
val eventToVerify = outputBuffer(i)(j).event
98-
val str = decoder.decode(eventToVerify.getBody)
99-
assert(str.toString === (String.valueOf(i) + input(j)))
100-
assert(eventToVerify.getHeaders.get("test-" + input(j).toString) === "header")
96+
for (i <- 0 until 25) {
97+
val eventToVerify = EventBuilder.withBody(
98+
String.valueOf(i).getBytes("utf-8"),
99+
Map[String, String]("test-" + i.toString -> "header"))
100+
var found = false
101+
var j = 0
102+
while (j < flattenedBuffer.size && !found) {
103+
val strToCompare = new String(flattenedBuffer(j).event.getBody.array(), "utf-8")
104+
if (new String(eventToVerify.getBody, "utf-8") == strToCompare &&
105+
eventToVerify.getHeaders.get("test-" + i.toString)
106+
.equals(flattenedBuffer(j).event.getHeaders.get("test-" + i.toString))) {
107+
found = true
108+
counter += 1
109+
}
110+
j += 1
111+
}
101112
}
113+
assert (counter === 25)
102114
}
103115

104116
}

project/SparkBuild.scala

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ import sbt.Keys._
2323
import sbt.Task
2424
import sbtassembly.Plugin._
2525
import AssemblyKeys._
26-
import sbtavro.SbtAvro._
2726
import scala.Some
2827
import scala.util.Properties
2928
import org.scalastyle.sbt.ScalastylePlugin.{Settings => ScalaStyleSettings}

0 commit comments

Comments
 (0)