File tree Expand file tree Collapse file tree 4 files changed +77
-7
lines changed
main/scala/org/apache/spark/streaming/flume/sink
test/scala/org/apache/spark/streaming/flume/sink Expand file tree Collapse file tree 4 files changed +77
-7
lines changed Original file line number Diff line number Diff line change 4242 <dependency >
4343 <groupId >org.apache.flume</groupId >
4444 <artifactId >flume-ng-sdk</artifactId >
45+ <exclusions >
46+ <!-- Guava is excluded to avoid its use in this module. -->
47+ <exclusion >
48+ <groupId >com.google.guava</groupId >
49+ <artifactId >guava</artifactId >
50+ </exclusion >
51+ <!--
52+ Exclude libthrift since the flume poms seem to confuse sbt, which fails to find the
53+ dependency.
54+ -->
55+ <exclusion >
56+ <groupId >org.apache.thrift</groupId >
57+ <artifactId >libthrift</artifactId >
58+ </exclusion >
59+ </exclusions >
4560 </dependency >
4661 <dependency >
4762 <groupId >org.apache.flume</groupId >
4863 <artifactId >flume-ng-core</artifactId >
64+ <exclusions >
65+ <exclusion >
66+ <groupId >com.google.guava</groupId >
67+ <artifactId >guava</artifactId >
68+ </exclusion >
69+ <exclusion >
70+ <groupId >org.apache.thrift</groupId >
71+ <artifactId >libthrift</artifactId >
72+ </exclusion >
73+ </exclusions >
4974 </dependency >
5075 <dependency >
5176 <groupId >org.scala-lang</groupId >
5277 <artifactId >scala-library</artifactId >
5378 </dependency >
79+ <dependency >
80+ <!-- Add Guava in test scope since flume actually needs it. -->
81+ <groupId >com.google.guava</groupId >
82+ <artifactId >guava</artifactId >
83+ <scope >test</scope >
84+ </dependency >
5485 <dependency >
5586 <!--
5687 Netty explicitly added in test as it has been excluded from
85116 </execution >
86117 </executions >
87118 </plugin >
119+ <plugin >
120+ <groupId >org.apache.maven.plugins</groupId >
121+ <artifactId >maven-shade-plugin</artifactId >
122+ <configuration >
123+ <!-- Disable all relocations defined in the parent pom. -->
124+ <relocations combine.self=" override" />
125+ </configuration >
126+ </plugin >
88127 </plugins >
89128 </build >
90129</project >
Original file line number Diff line number Diff line change @@ -21,7 +21,6 @@ import java.util.concurrent.atomic.AtomicLong
2121
2222import scala .collection .mutable
2323
24- import com .google .common .util .concurrent .ThreadFactoryBuilder
2524import org .apache .flume .Channel
2625import org .apache .commons .lang3 .RandomStringUtils
2726
@@ -45,8 +44,7 @@ import org.apache.commons.lang3.RandomStringUtils
4544private [flume] class SparkAvroCallbackHandler (val threads : Int , val channel : Channel ,
4645 val transactionTimeout : Int , val backOffInterval : Int ) extends SparkFlumeProtocol with Logging {
4746 val transactionExecutorOpt = Option (Executors .newFixedThreadPool(threads,
48- new ThreadFactoryBuilder ().setDaemon(true )
49- .setNameFormat(" Spark Sink Processor Thread - %d" ).build()))
47+ new SparkSinkThreadFactory (" Spark Sink Processor Thread - %d" )))
5048 // Protected by `sequenceNumberToProcessor`
5149 private val sequenceNumberToProcessor = mutable.HashMap [CharSequence , TransactionProcessor ]()
5250 // This sink will not persist sequence numbers and reuses them if it gets restarted.
Original file line number Diff line number Diff line change 1+ /*
2+ * Licensed to the Apache Software Foundation (ASF) under one or more
3+ * contributor license agreements. See the NOTICE file distributed with
4+ * this work for additional information regarding copyright ownership.
5+ * The ASF licenses this file to You under the Apache License, Version 2.0
6+ * (the "License"); you may not use this file except in compliance with
7+ * the License. You may obtain a copy of the License at
8+ *
9+ * http://www.apache.org/licenses/LICENSE-2.0
10+ *
11+ * Unless required by applicable law or agreed to in writing, software
12+ * distributed under the License is distributed on an "AS IS" BASIS,
13+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+ * See the License for the specific language governing permissions and
15+ * limitations under the License.
16+ */
17+ package org .apache .spark .streaming .flume .sink
18+
19+ import java .util .concurrent .ThreadFactory
20+ import java .util .concurrent .atomic .AtomicLong
21+
22+ /**
23+ * Thread factory that generates daemon threads with a specified name format.
24+ */
25+ private [sink] class SparkSinkThreadFactory (nameFormat : String ) extends ThreadFactory {
26+
27+ private val threadId = new AtomicLong ()
28+
29+ override def newThread (r : Runnable ): Thread = {
30+ val t = new Thread (r, nameFormat.format(threadId.incrementAndGet()))
31+ t.setDaemon(true )
32+ t
33+ }
34+
35+ }
Original file line number Diff line number Diff line change @@ -24,7 +24,6 @@ import scala.collection.JavaConversions._
2424import scala .concurrent .{ExecutionContext , Future }
2525import scala .util .{Failure , Success }
2626
27- import com .google .common .util .concurrent .ThreadFactoryBuilder
2827import org .apache .avro .ipc .NettyTransceiver
2928import org .apache .avro .ipc .specific .SpecificRequestor
3029import org .apache .flume .Context
@@ -185,9 +184,8 @@ class SparkSinkSuite extends FunSuite {
185184 count : Int ): Seq [(NettyTransceiver , SparkFlumeProtocol .Callback )] = {
186185
187186 (1 to count).map(_ => {
188- lazy val channelFactoryExecutor =
189- Executors .newCachedThreadPool(new ThreadFactoryBuilder ().setDaemon(true ).
190- setNameFormat(" Flume Receiver Channel Thread - %d" ).build())
187+ lazy val channelFactoryExecutor = Executors .newCachedThreadPool(
188+ new SparkSinkThreadFactory (" Flume Receiver Channel Thread - %d" ))
191189 lazy val channelFactory =
192190 new NioClientSocketChannelFactory (channelFactoryExecutor, channelFactoryExecutor)
193191 val transceiver = new NettyTransceiver (address, channelFactory)
You can’t perform that action at this time.
0 commit comments