@@ -34,10 +34,8 @@ import org.apache.avro.ipc.NettyServer
3434import org .apache .avro .ipc .specific .SpecificResponder
3535import java .net .InetSocketAddress
3636
37- class SparkSink () extends AbstractSink with Configurable {
37+ class SparkSink extends AbstractSink with Configurable {
3838 private val LOG = LoggerFactory .getLogger(this .getClass)
39- private val lock = new ReentrantLock ()
40- private val blockingCondition = lock.newCondition()
4139
4240 // This sink will not persist sequence numbers and reuses them if it gets restarted.
4341 // So it is possible to commit a transaction which may have been meant for the sink before the
@@ -58,19 +56,20 @@ class SparkSink() extends AbstractSink with Configurable {
5856
5957 private val processorMap = new ConcurrentHashMap [CharSequence , TransactionProcessor ]()
6058
61- private var processorFactory : Option [SparkHandlerFactory ] = None
59+ private var processorManager : Option [TransactionProcessorManager ] = None
6260 private var hostname : String = SparkSinkConfig .DEFAULT_HOSTNAME
6361 private var port : Int = 0
6462 private var maxThreads : Int = SparkSinkConfig .DEFAULT_MAX_THREADS
6563 private var serverOpt : Option [NettyServer ] = None
66- private var running = false
64+
65+ private val blockingLatch = new CountDownLatch (1 )
6766
6867 override def start () {
6968 transactionExecutorOpt = Option (Executors .newFixedThreadPool(numProcessors,
7069 new ThreadFactoryBuilder ().setDaemon(true )
7170 .setNameFormat(" Spark Sink, " + getName + " Processor Thread - %d" ).build()))
7271
73- processorFactory = Option (new SparkHandlerFactory (numProcessors))
72+ processorManager = Option (new TransactionProcessorManager (numProcessors))
7473
7574 val responder = new SpecificResponder (classOf [SparkFlumeProtocol ], new AvroCallbackHandler ())
7675
@@ -80,12 +79,6 @@ class SparkSink() extends AbstractSink with Configurable {
8079 serverOpt = Option (new NettyServer (responder, new InetSocketAddress (hostname, port)))
8180
8281 serverOpt.map(server => server.start())
83- lock.lock()
84- try {
85- running = true
86- } finally {
87- lock.unlock()
88- }
8982 super .start()
9083 }
9184
@@ -95,65 +88,48 @@ class SparkSink() extends AbstractSink with Configurable {
9588 server.close()
9689 server.join()
9790 })
98- lock.lock()
99- try {
100- running = false
101- blockingCondition.signalAll()
102- } finally {
103- lock.unlock()
104- }
91+ blockingLatch.countDown()
92+ super .stop()
10593 }
10694
10795 override def configure (ctx : Context ) {
10896 import SparkSinkConfig ._
10997 hostname = ctx.getString(CONF_HOSTNAME , DEFAULT_HOSTNAME )
110- val portOpt = Option (ctx.getInteger(CONF_PORT ))
111- if (portOpt.isDefined) {
112- port = portOpt.get
113- } else {
114- throw new ConfigurationException (" The Port to bind must be specified" )
115- }
98+ port = Option (ctx.getInteger(CONF_PORT )).
99+ getOrElse(throw new ConfigurationException (" The port to bind to must be specified" ))
116100 numProcessors = ctx.getInteger(PROCESSOR_COUNT , DEFAULT_PROCESSOR_COUNT )
117101 transactionTimeout = ctx.getInteger(CONF_TRANSACTION_TIMEOUT , DEFAULT_TRANSACTION_TIMEOUT )
118102 maxThreads = ctx.getInteger(CONF_MAX_THREADS , DEFAULT_MAX_THREADS )
119103 }
120104
121105 override def process (): Status = {
122106 // This method is called in a loop by the Flume framework - block it until the sink is
123- // stopped to save CPU resources
124- lock.lock()
125- try {
126- while (running) {
127- blockingCondition.await()
128- }
129- } finally {
130- lock.unlock()
131- }
107+ // stopped to save CPU resources. The sink runner will interrupt this thread when the sink is
108+ // being shut down.
109+ blockingLatch.await()
132110 Status .BACKOFF
133111 }
134112
135113
136114 // Object representing an empty batch returned by the txn processor due to some error.
137115 case object ErrorEventBatch extends EventBatch
138116
139- private class AvroCallbackHandler () extends SparkFlumeProtocol {
117+ private class AvroCallbackHandler extends SparkFlumeProtocol {
140118
141119 override def getEventBatch (n : Int ): EventBatch = {
142- val processor = processorFactory .get.checkOut(n)
120+ val processor = processorManager .get.checkOut(n)
143121 transactionExecutorOpt.map(executor => executor.submit(processor))
144122 // Wait until a batch is available - can be null if some error was thrown
145- val eventBatch = processor.eventQueue.take()
146- eventBatch match {
123+ processor.eventQueue.take() match {
147124 case ErrorEventBatch => throw new FlumeException (" Something went wrong. No events" +
148125 " retrieved from channel." )
149- case events => {
150- processorMap.put(events .getSequenceNumber, processor)
126+ case eventBatch : EventBatch =>
127+ processorMap.put(eventBatch .getSequenceNumber, processor)
151128 if (LOG .isDebugEnabled) {
152- LOG .debug(" Sent " + events.getEventBatch .size() +
153- " events with sequence number: " + events .getSequenceNumber)
129+ LOG .debug(" Sent " + eventBatch.getEvents .size() +
130+ " events with sequence number: " + eventBatch .getSequenceNumber)
154131 }
155- events
156- }
132+ eventBatch
157133 }
158134 }
159135
@@ -214,41 +190,23 @@ class SparkSink() extends AbstractSink with Configurable {
214190 tx.begin()
215191 try {
216192 eventBatch.setSequenceNumber(seqBase + seqNum.incrementAndGet())
217- val events = eventBatch.getEventBatch
193+ val events = eventBatch.getEvents
218194 events.clear()
219195 val loop = new Breaks
220196 var gotEventsInThisTxn = false
221197 loop.breakable {
222- var i = 0
223- // Using for here causes the maxBatchSize change to be ineffective as the Range gets
224- // pregenerated
225- while (i < maxBatchSize) {
226- i += 1
227- val eventOpt = Option (getChannel.take())
228- eventOpt.map(event => {
229- events.add(new SparkSinkEvent (toCharSequenceMap(event
230- .getHeaders),
231- ByteBuffer .wrap(event.getBody)))
232- gotEventsInThisTxn = true
233- })
234- if (eventOpt.isEmpty) {
235- if (! gotEventsInThisTxn) {
236- // To avoid sending empty batches, we wait till events are available backing off
237- // between attempts to get events. Each attempt to get an event though causes one
238- // iteration to be lost. To ensure that we still send back maxBatchSize number of
239- // events, we cheat and increase the maxBatchSize by 1 to account for the lost
240- // iteration. Even throwing an exception is expensive as Avro will serialize it
241- // and send it over the wire, which is useless. Before incrementing though,
242- // ensure that we are not anywhere near INT_MAX.
243- if (maxBatchSize >= Int .MaxValue / 2 ) {
244- // Random sanity check
245- throw new RuntimeException (" Safety exception - polled too many times, no events!" )
198+ while (events.size() < maxBatchSize) {
199+ Option (getChannel.take()) match {
200+ case Some (event) =>
201+ events.add(new SparkSinkEvent (toCharSequenceMap(event.getHeaders),
202+ ByteBuffer .wrap(event.getBody)))
203+ gotEventsInThisTxn = true
204+ case None =>
205+ if (! gotEventsInThisTxn) {
206+ Thread .sleep(500 )
207+ } else {
208+ loop.break()
246209 }
247- maxBatchSize += 1
248- Thread .sleep(500 )
249- } else {
250- loop.break()
251- }
252210 }
253211 }
254212 }
@@ -284,7 +242,7 @@ class SparkSink() extends AbstractSink with Configurable {
284242 } finally {
285243 resultQueueUpdateLock.unlock()
286244 }
287- eventBatch.getEventBatch .clear()
245+ eventBatch.getEvents .clear()
288246 // If the batch failed on spark side, throw a FlumeException
289247 maybeResult.map(success =>
290248 if (! success) {
@@ -315,7 +273,7 @@ class SparkSink() extends AbstractSink with Configurable {
315273 // remove the event from the map and then clear the value
316274 resultQueue.clear()
317275 processorMap.remove(eventBatch.getSequenceNumber)
318- processorFactory .get.checkIn(this )
276+ processorManager .get.checkIn(this )
319277 tx.close()
320278 }
321279 }
@@ -328,7 +286,7 @@ class SparkSink() extends AbstractSink with Configurable {
328286 }
329287 }
330288
331- private class SparkHandlerFactory (val maxInstances : Int ) {
289+ private class TransactionProcessorManager (val maxInstances : Int ) {
332290 val queue = new scala.collection.mutable.Queue [TransactionProcessor ]
333291 val queueModificationLock = new ReentrantLock ()
334292 var currentSize = 0
0 commit comments