From fa22a3879a9d4887cd229966b6523dd65b2f6003 Mon Sep 17 00:00:00 2001 From: fireflyc Date: Fri, 11 Jul 2014 16:03:20 +0800 Subject: [PATCH 1/2] replace println to log4j Our program needs to receive a large amount of data and run for a long time. We set the log level to WARN but "Storing iterator" "received single" as such message written to the log file. (over yarn) --- .../spark/streaming/receiver/ActorReceiver.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala index 743be58950c0..4a4a7384f186 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala @@ -68,13 +68,13 @@ object ActorSupervisorStrategy { * should be same. */ @DeveloperApi -trait ActorHelper { +trait ActorHelper extends Logging{ self: Actor => // to ensure that this can be added to Actor classes only /** Store an iterator of received data as a data block into Spark's memory. */ def store[T](iter: Iterator[T]) { - println("Storing iterator") + logInfo("Storing iterator") context.parent ! IteratorData(iter) } @@ -93,7 +93,7 @@ trait ActorHelper { * being pushed into Spark's memory. */ def store[T](item: T) { - println("Storing item") + logInfo("Storing item") context.parent ! SingleItemData(item) } } @@ -157,11 +157,11 @@ private[streaming] class ActorReceiver[T: ClassTag]( def receive = { case IteratorData(iterator) => - println("received iterator") + logInfo("received iterator") store(iterator.asInstanceOf[Iterator[T]]) case SingleItemData(msg) => - println("received single") + logInfo("received single") store(msg.asInstanceOf[T]) n.incrementAndGet From e68414002de6c4fb9c24cfd7be5bce57d3adab90 Mon Sep 17 00:00:00 2001 From: fireflyc Date: Wed, 16 Jul 2014 13:29:59 +0800 Subject: [PATCH 2/2] 'info' modified into the 'debug' --- .../spark/streaming/receiver/ActorReceiver.scala | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala index 4a4a7384f186..1868a1ebc7b4 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ActorReceiver.scala @@ -74,7 +74,7 @@ trait ActorHelper extends Logging{ /** Store an iterator of received data as a data block into Spark's memory. */ def store[T](iter: Iterator[T]) { - logInfo("Storing iterator") + logDebug("Storing iterator") context.parent ! IteratorData(iter) } @@ -84,6 +84,7 @@ trait ActorHelper extends Logging{ * that Spark is configured to use. */ def store(bytes: ByteBuffer) { + logDebug("Storing Bytes") context.parent ! ByteBufferData(bytes) } @@ -93,7 +94,7 @@ trait ActorHelper extends Logging{ * being pushed into Spark's memory. */ def store[T](item: T) { - logInfo("Storing item") + logDebug("Storing item") context.parent ! SingleItemData(item) } } @@ -157,15 +158,16 @@ private[streaming] class ActorReceiver[T: ClassTag]( def receive = { case IteratorData(iterator) => - logInfo("received iterator") + logDebug("received iterator") store(iterator.asInstanceOf[Iterator[T]]) case SingleItemData(msg) => - logInfo("received single") + logDebug("received single") store(msg.asInstanceOf[T]) n.incrementAndGet case ByteBufferData(bytes) => + logDebug("received bytes") store(bytes) case props: Props =>