Skip to content

Commit 4b0c7fc

Browse files
FLUME-1729. New Flume-Spark integration.
Avro does not support inheritance, so the error message needs to be part of the message itself.
1 parent bda01fc commit 4b0c7fc

File tree

4 files changed

+42
-67
lines changed

4 files changed

+42
-67
lines changed

external/flume-sink/src/main/avro/sparkflume.avdl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ protocol SparkFlumeProtocol {
2727
}
2828

2929
record EventBatch {
30+
string errorMsg = ""; // If this is empty it is a valid message, else it represents an error
3031
string sequenceNumber;
3132
array<SparkSinkEvent> events;
3233
}

external/flume-sink/src/main/scala/org/apache/spark/flume/ErrorEventBatch.scala

Lines changed: 0 additions & 28 deletions
This file was deleted.

external/flume-sink/src/main/scala/org/apache/spark/flume/sink/TransactionProcessor.scala

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ import java.util.concurrent.{TimeUnit, CountDownLatch, Callable}
2323
import scala.util.control.Breaks
2424

2525
import org.apache.flume.{Transaction, Channel}
26-
import org.apache.spark.flume.{SparkSinkEvent, ErrorEventBatch, EventBatch}
26+
import org.apache.spark.flume.{SparkSinkEvent, EventBatch}
2727
import org.slf4j.LoggerFactory
2828

2929

@@ -50,7 +50,8 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
5050
private val LOG = LoggerFactory.getLogger(classOf[TransactionProcessor])
5151

5252
// If a real batch is not returned, we always have to return an error batch.
53-
@volatile private var eventBatch: EventBatch = new ErrorEventBatch("Unknown Error")
53+
@volatile private var eventBatch: EventBatch = new EventBatch("Unknown Error", "",
54+
util.Collections.emptyList())
5455

5556
// Synchronization primitives
5657
val batchGeneratedLatch = new CountDownLatch(1)
@@ -70,7 +71,7 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
7071
/**
7172
* Get an event batch from the channel. This method will block until a batch of events is
7273
* available from the channel. If no events are available after a large number of attempts of
73-
* polling the channel, this method will return [[ErrorEventBatch]].
74+
* polling the channel, this method will return an [[EventBatch]] with a non-empty error message
7475
*
7576
* @return An [[EventBatch]] instance with sequence number set to seqNum, filled with a
7677
* maximum of maxBatchSize events
@@ -96,16 +97,14 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
9697

9798
/**
9899
* Populates events into the event batch. If the batch cannot be populated,
99-
* this method will not set the event batch which will stay [[ErrorEventBatch]]
100+
* this method will not set the events into the event batch, but it sets an error message.
100101
*/
101102
private def populateEvents() {
102103
try {
103104
txOpt = Option(channel.getTransaction)
104105
if(txOpt.isEmpty) {
105-
assert(eventBatch.isInstanceOf[ErrorEventBatch])
106-
eventBatch.asInstanceOf[ErrorEventBatch].message = "Something went wrong. Channel was " +
107-
"unable to create a transaction!"
108-
eventBatch
106+
eventBatch.setErrorMsg("Something went wrong. Channel was " +
107+
"unable to create a transaction!")
109108
}
110109
txOpt.map(tx => {
111110
tx.begin()
@@ -135,16 +134,16 @@ private class TransactionProcessor(val channel: Channel, val seqNum: String,
135134
val msg = "Tried several times, " +
136135
"but did not get any events from the channel!"
137136
LOG.warn(msg)
138-
eventBatch.asInstanceOf[ErrorEventBatch].message = msg
137+
eventBatch.setErrorMsg(msg)
139138
} else {
140139
// At this point, the events are available, so fill them into the event batch
141-
eventBatch = new EventBatch(seqNum, events)
140+
eventBatch = new EventBatch("",seqNum, events)
142141
}
143142
})
144143
} catch {
145144
case e: Exception =>
146145
LOG.error("Error while processing transaction.", e)
147-
eventBatch.asInstanceOf[ErrorEventBatch].message = e.getMessage
146+
eventBatch.setErrorMsg(e.getMessage)
148147
try {
149148
txOpt.map(tx => {
150149
rollbackAndClose(tx, close = true)

external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala

Lines changed: 31 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ import org.apache.avro.ipc.specific.SpecificRequestor
3131
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
3232

3333
import org.apache.spark.Logging
34-
import org.apache.spark.flume.{EventBatch, ErrorEventBatch, SparkSinkEvent, SparkFlumeProtocol}
34+
import org.apache.spark.flume.{EventBatch, SparkSinkEvent, SparkFlumeProtocol}
3535
import org.apache.spark.storage.StorageLevel
3636
import org.apache.spark.streaming.StreamingContext
3737
import org.apache.spark.streaming.dstream.ReceiverInputDStream
@@ -99,37 +99,40 @@ private[streaming] class FlumePollingReceiver(
9999
counter = counter % connections.size
100100
val client = connections(counter).client
101101
counter += 1
102-
client.getEventBatch(maxBatchSize) match {
103-
case errorBatch: ErrorEventBatch =>
104-
logWarning("Error Event Batch received from Spark Sink. " + errorBatch.message)
105-
case batch: EventBatch =>
106-
val seq = batch.getSequenceNumber
107-
val events: java.util.List[SparkSinkEvent] = batch.getEvents
108-
logDebug(
109-
"Received batch of " + events.size() + " events with sequence number: " + seq)
110-
try {
111-
// Convert each Flume event to a serializable SparkPollingEvent
112-
events.foreach(event => {
113-
store(SparkFlumePollingEvent.fromSparkSinkEvent(event))
114-
})
115-
// Send an ack to Flume so that Flume discards the events from its channels.
116-
client.ack(seq)
117-
} catch {
118-
case e: Throwable =>
119-
try {
120-
// Let Flume know that the events need to be pushed back into the channel.
121-
client.nack(seq) // If the agent is down, even this could fail and throw
122-
} catch {
123-
case e: Throwable => logError(
124-
"Sending Nack also failed. A Flume agent is down.")
125-
}
126-
TimeUnit.SECONDS.sleep(2L) // for now just leave this as a fixed 2 seconds.
127-
logWarning("Error while attempting to store events", e)
128-
}
102+
val eventBatch = client.getEventBatch(maxBatchSize)
103+
val errorMsg = eventBatch.getErrorMsg
104+
if (errorMsg.toString.equals("")) { // No error, proceed with processing data
105+
val seq = eventBatch.getSequenceNumber
106+
val events: java.util.List[SparkSinkEvent] = eventBatch.getEvents
107+
logDebug(
108+
"Received batch of " + events.size() + " events with sequence number: " + seq)
109+
try {
110+
// Convert each Flume event to a serializable SparkPollingEvent
111+
events.foreach(event => {
112+
store(SparkFlumePollingEvent.fromSparkSinkEvent(event))
113+
})
114+
// Send an ack to Flume so that Flume discards the events from its channels.
115+
client.ack(seq)
116+
} catch {
117+
case e: Exception =>
118+
try {
119+
// Let Flume know that the events need to be pushed back into the channel.
120+
client.nack(seq) // If the agent is down, even this could fail and throw
121+
} catch {
122+
case e: Exception => logError(
123+
"Sending Nack also failed. A Flume agent is down.")
124+
}
125+
TimeUnit.SECONDS.sleep(2L) // for now just leave this as a fixed 2 seconds.
126+
logWarning("Error while attempting to store events", e)
127+
}
128+
} else {
129+
logWarning("Did not receive events from Flume agent due to error on the Flume agent: " +
130+
"" + errorMsg.toString)
129131
}
130132
}
131133
}
132134
}
135+
133136
// Create multiple threads and start all of them.
134137
for (i <- 0 until parallelism) {
135138
logInfo("Starting Flume Polling Receiver worker threads starting..")

0 commit comments

Comments
 (0)