Skip to content

Commit bda01fc

Browse files
FLUME-1729. Flume-Spark integration.
Refactoring classes into new files and minor changes in protocol.
1 parent 0d69604 commit bda01fc

File tree

5 files changed

+415
-325
lines changed

5 files changed

+415
-325
lines changed
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.flume
18+
19+
import java.util
20+
21+
/**
22+
* Object representing an empty batch returned by the txn processor due to some error.
23+
*/
24+
case class ErrorEventBatch(var message: String) extends EventBatch {
25+
// Make sure the internal data structures are initialized with non-null values.
26+
setEvents(util.Collections.emptyList())
27+
setSequenceNumber("")
28+
}
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.spark.flume.sink
18+
19+
import java.util.concurrent.atomic.AtomicLong
20+
import java.util.concurrent.{ConcurrentHashMap, Executors}
21+
22+
import com.google.common.util.concurrent.ThreadFactoryBuilder
23+
24+
import org.apache.commons.lang.RandomStringUtils
25+
import org.apache.flume.Channel
26+
import org.apache.spark.flume.{EventBatch, SparkFlumeProtocol}
27+
import org.slf4j.LoggerFactory
28+
29+
/**
30+
* Class that implements the SparkFlumeProtocol, that is used by the Avro Netty Server to process
31+
* requests. Each getEvents, ack and nack call is forwarded to an instance of this class.
32+
* @param threads Number of threads to use to process requests.
33+
* @param channel The channel that the sink pulls events from
34+
* @param transactionTimeout Timeout in millis after which the transaction if not acked by Spark
35+
* is rolled back.
36+
*/
37+
private class SparkAvroCallbackHandler(val threads: Int, val channel: Channel,
38+
val transactionTimeout: Int, val backOffInterval: Int) extends SparkFlumeProtocol {
39+
private val LOG = LoggerFactory.getLogger(classOf[SparkAvroCallbackHandler])
40+
val transactionExecutorOpt = Option(Executors.newFixedThreadPool(threads,
41+
new ThreadFactoryBuilder().setDaemon(true)
42+
.setNameFormat("Spark Sink Processor Thread - %d").build()))
43+
private val processorMap = new ConcurrentHashMap[CharSequence, TransactionProcessor]()
44+
// This sink will not persist sequence numbers and reuses them if it gets restarted.
45+
// So it is possible to commit a transaction which may have been meant for the sink before the
46+
// restart.
47+
// Since the new txn may not have the same sequence number we must guard against accidentally
48+
// committing a new transaction. To reduce the probability of that happening a random string is
49+
// prepended to the sequence number. Does not change for life of sink
50+
private val seqBase = RandomStringUtils.randomAlphanumeric(8)
51+
private val seqCounter = new AtomicLong(0)
52+
53+
/**
54+
* Returns a bunch of events to Spark over Avro RPC.
55+
* @param n Maximum number of events to return in a batch
56+
* @return [[EventBatch]] instance that has a sequence number and an array of at most n events
57+
*/
58+
override def getEventBatch(n: Int): EventBatch = {
59+
val sequenceNumber = seqBase + seqCounter.incrementAndGet()
60+
val processor = new TransactionProcessor(channel, sequenceNumber,
61+
n, transactionTimeout, backOffInterval, this)
62+
transactionExecutorOpt.map(executor => {
63+
executor.submit(processor)
64+
})
65+
// Wait until a batch is available - will be an error if
66+
processor.getEventBatch
67+
}
68+
69+
/**
70+
* Called by Spark to indicate successful commit of a batch
71+
* @param sequenceNumber The sequence number of the event batch that was successful
72+
*/
73+
override def ack(sequenceNumber: CharSequence): Void = {
74+
completeTransaction(sequenceNumber, success = true)
75+
null
76+
}
77+
78+
/**
79+
* Called by Spark to indicate failed commit of a batch
80+
* @param sequenceNumber The sequence number of the event batch that failed
81+
* @return
82+
*/
83+
override def nack(sequenceNumber: CharSequence): Void = {
84+
completeTransaction(sequenceNumber, success = false)
85+
LOG.info("Spark failed to commit transaction. Will reattempt events.")
86+
null
87+
}
88+
89+
/**
90+
* Helper method to commit or rollback a transaction.
91+
* @param sequenceNumber The sequence number of the batch that was completed
92+
* @param success Whether the batch was successful or not.
93+
*/
94+
private def completeTransaction(sequenceNumber: CharSequence, success: Boolean) {
95+
Option(removeAndGetProcessor(sequenceNumber)).map(processor => {
96+
processor.batchProcessed(success)
97+
})
98+
}
99+
100+
/**
101+
* Helper method to remove the TxnProcessor for a Sequence Number. Can be used to avoid a leak.
102+
* @param sequenceNumber
103+
* @return The transaction processor for the corresponding batch. Note that this instance is no
104+
* longer tracked and the caller is responsible for that txn processor.
105+
*/
106+
private[flume] def removeAndGetProcessor(sequenceNumber: CharSequence): TransactionProcessor = {
107+
processorMap.remove(sequenceNumber.toString) // The toString is required!
108+
}
109+
110+
/**
111+
* Shuts down the executor used to process transactions.
112+
*/
113+
def shutdown() {
114+
transactionExecutorOpt.map(executor => {
115+
executor.shutdownNow()
116+
})
117+
}
118+
}

0 commit comments

Comments
 (0)