@@ -20,7 +20,7 @@ package org.apache.spark.streaming.flume
2020import java .io .{ObjectOutput , ObjectInput , Externalizable }
2121import java .net .InetSocketAddress
2222import java .nio .ByteBuffer
23- import java .util .concurrent .{TimeUnit , Executors }
23+ import java .util .concurrent .{LinkedBlockingQueue , TimeUnit , Executors }
2424
2525import org .apache .spark .flume .sink .SparkSinkUtils
2626
@@ -33,7 +33,7 @@ import org.apache.avro.ipc.specific.SpecificRequestor
3333import org .jboss .netty .channel .socket .nio .NioClientSocketChannelFactory
3434
3535import org .apache .spark .Logging
36- import org .apache .spark .flume .{EventBatch , SparkSinkEvent , SparkFlumeProtocol }
36+ import org .apache .spark .flume .{SparkSinkEvent , SparkFlumeProtocol }
3737import org .apache .spark .storage .StorageLevel
3838import org .apache .spark .streaming .StreamingContext
3939import org .apache .spark .streaming .dstream .ReceiverInputDStream
@@ -83,60 +83,64 @@ private[streaming] class FlumePollingReceiver(
8383 lazy val receiverExecutor = Executors .newFixedThreadPool(parallelism,
8484 new ThreadFactoryBuilder ().setDaemon(true ).setNameFormat(" Flume Receiver Thread - %d" ).build())
8585
86- private var connections = Array .empty [FlumeConnection ] // temporarily empty, filled in later
86+ private val connections = new LinkedBlockingQueue [FlumeConnection ]()
8787
8888 override def onStart (): Unit = {
8989 // Create the connections to each Flume agent.
90- connections = addresses.map (host => {
90+ addresses.foreach (host => {
9191 val transceiver = new NettyTransceiver (host, channelFactory)
9292 val client = SpecificRequestor .getClient(classOf [SparkFlumeProtocol .Callback ], transceiver)
93- new FlumeConnection (transceiver, client)
94- }).toArray
95-
93+ connections.add(new FlumeConnection (transceiver, client))
94+ })
9695 for (i <- 0 until parallelism) {
9796 logInfo(" Starting Flume Polling Receiver worker threads starting.." )
9897 // Threads that pull data from Flume.
9998 receiverExecutor.submit(new Runnable {
10099 override def run (): Unit = {
101- var counter = i
102100 while (true ) {
103- counter = counter % (connections.length)
104- val client = connections(counter).client
105- counter += 1
106- val eventBatch = client.getEventBatch(maxBatchSize)
107- if (! SparkSinkUtils .isErrorBatch(eventBatch)) {
108- // No error, proceed with processing data
109- val seq = eventBatch.getSequenceNumber
110- val events : java.util.List [SparkSinkEvent ] = eventBatch.getEvents
111- logDebug(
112- " Received batch of " + events.size() + " events with sequence number: " + seq)
113- try {
114- // Convert each Flume event to a serializable SparkPollingEvent
115- var j = 0
116- while (j < events.size()) {
117- store(SparkFlumePollingEvent .fromSparkSinkEvent(events(j)))
118- logDebug(" Stored events with seq:" + seq)
119- j += 1
120- }
121- logInfo(" Sending ack for: " + seq)
122- // Send an ack to Flume so that Flume discards the events from its channels.
123- client.ack(seq)
124- logDebug(" Ack sent for sequence number: " + seq)
125- } catch {
126- case e : Exception =>
127- try {
128- // Let Flume know that the events need to be pushed back into the channel.
129- client.nack(seq) // If the agent is down, even this could fail and throw
130- } catch {
131- case e : Exception => logError(
132- " Sending Nack also failed. A Flume agent is down." )
101+ val connection = connections.poll()
102+ val client = connection.client
103+ try {
104+ val eventBatch = client.getEventBatch(maxBatchSize)
105+ if (! SparkSinkUtils .isErrorBatch(eventBatch)) {
106+ // No error, proceed with processing data
107+ val seq = eventBatch.getSequenceNumber
108+ val events : java.util.List [SparkSinkEvent ] = eventBatch.getEvents
109+ logDebug(
110+ " Received batch of " + events.size() + " events with sequence number: " + seq)
111+ try {
112+ // Convert each Flume event to a serializable SparkPollingEvent
113+ var j = 0
114+ while (j < events.size()) {
115+ store(SparkFlumePollingEvent .fromSparkSinkEvent(events(j)))
116+ logDebug(" Stored events with seq:" + seq)
117+ j += 1
133118 }
134- TimeUnit .SECONDS .sleep(2L ) // for now just leave this as a fixed 2 seconds.
135- logWarning(" Error while attempting to store events" , e)
119+ logDebug(" Sending ack for: " + seq)
120+ // Send an ack to Flume so that Flume discards the events from its channels.
121+ client.ack(seq)
122+ logDebug(" Ack sent for sequence number: " + seq)
123+ } catch {
124+ case e : Exception =>
125+ try {
126+ // Let Flume know that the events need to be pushed back into the channel.
127+ client.nack(seq) // If the agent is down, even this could fail and throw
128+ } catch {
129+ case e : Exception => logError(
130+ " Sending Nack also failed. A Flume agent is down." )
131+ }
132+ TimeUnit .SECONDS .sleep(2L ) // for now just leave this as a fixed 2 seconds.
133+ logWarning(" Error while attempting to store events" , e)
134+ }
135+ } else {
136+ logWarning(" Did not receive events from Flume agent due to error on the Flume " +
137+ " agent: " + eventBatch.getErrorMsg)
136138 }
137- } else {
138- logWarning(" Did not receive events from Flume agent due to error on the Flume " +
139- " agent: " + eventBatch.getErrorMsg)
139+ } catch {
140+ case e : Exception =>
141+ logWarning(" Error while reading data from Flume" , e)
142+ } finally {
143+ connections.add(connection)
140144 }
141145 }
142146 }
0 commit comments