From 87032d23a1ebfa909408edb193763165f351383a Mon Sep 17 00:00:00 2001 From: sachin aggarwal Date: Wed, 23 Mar 2016 17:27:53 +0530 Subject: [PATCH 01/13] adding initial code for CEP --- .../dstream/PatternMatchedDStream.scala | 190 ++++++++++++++++++ 1 file changed, 190 insertions(+) create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/dstream/PatternMatchedDStream.scala diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PatternMatchedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PatternMatchedDStream.scala new file mode 100644 index 000000000000..8df4d25a7aa9 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PatternMatchedDStream.scala @@ -0,0 +1,190 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.dstream + +import org.apache.spark.rdd.{PartitionerAwareUnionRDD, RDD, UnionRDD} +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{Duration, Interval, Time} +import org.apache.spark.{Accumulator, Partitioner, SparkContext} + +import scala.reflect.ClassTag + +/** + * Created by agsachin on 26/02/16. + */ + +object PatternMatchedDStream { + private var tracker: Accumulator[Long] = null + + def getInstance(sc: SparkContext): Accumulator[Long] = { + if (tracker == null) { + synchronized { + if (tracker == null) { + tracker = sc.accumulator(0L, "PatternMatchedDStream") + } + } + } + tracker + } +} + +case class WindowMetric(first: Any, prev: Any) {} + +private[streaming] +class PatternMatchedDStream[T: ClassTag]( + parent: DStream[T], + pattern: scala.util.matching.Regex, + predicates: Map[String, (T, WindowMetric) => Boolean], + _windowDuration: Duration, + _slideDuration: Duration, + partitioner: Partitioner + ) extends DStream[List[T]](parent.ssc) { + + require(_windowDuration.isMultipleOf(parent.slideDuration), + "The window duration of ReducedWindowedDStream (" + _windowDuration + ") " + + "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")" + ) + + require(_slideDuration.isMultipleOf(parent.slideDuration), + "The slide duration of ReducedWindowedDStream (" + _slideDuration + ") " + + "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")" + ) + + super.persist(StorageLevel.MEMORY_ONLY_SER) + + def windowDuration: Duration = _windowDuration + + override def dependencies: List[DStream[_]] = List(parent) + + override def slideDuration: Duration = _slideDuration + + override val mustCheckpoint = true + + override def parentRememberDuration: Duration = rememberDuration + windowDuration + + val applyRegex = (rdd: RDD[(Long, (String, T))]) => { + val patternCopy = pattern + import scala.collection.mutable.ListBuffer + val stream = rdd.groupBy(_ => "all") + .map(_._2) + .map(x => { + val it = x.iterator + val builder = new scala.collection.mutable.StringBuilder() + val map = scala.collection.mutable.HashMap[Long, Tuple2[Long,T]] () + var curLen = 1L + while (it.hasNext) { + val i = it.next() + builder.append(" " + i._2._1) + map.put(curLen, (i._1, i._2._2)) + curLen += i._2._1.length + 1 + } + //println(builder.toString(), map) + (builder.toString(), map) + }) + val tracker = PatternMatchedDStream.getInstance(this.ssc.sparkContext) + + stream.flatMap(y => { + val it = patternCopy.findAllIn(y._1) + val o = ListBuffer[List[T]] () + for (one <- it) { + var len = 0 + var ctr = 0 + val list = ListBuffer[T] () + one.split(" ").map(sub => { + val id = y._2.get(it.start + len) + list += id.get._2 + //println("id="+id.get._1) + if (tracker.toString.toLong < id.get._1) { + tracker.setValue(id.get._1) + } + len += sub.length + 1 + }) + o += list.toList + } + // o.toList.foreach(println) + o.toList + }) + } + + override def compute(validTime: Time): Option[RDD[List[T]]] = { + val predicatesCopy = predicates + val patternCopy = pattern; + val currentWindow = new Interval(validTime - windowDuration + parent.slideDuration, validTime) + val previousWindow = currentWindow - slideDuration + + val tracker = PatternMatchedDStream.getInstance(this.ssc.sparkContext) + + val oldRDDs = + parent.slice(previousWindow.beginTime, currentWindow.beginTime - parent.slideDuration) + + val oldRDD = if (oldRDDs.flatMap(_.partitioner).distinct.length == 1) { + logDebug("Using partition aware union for windowing at " + validTime) + new PartitionerAwareUnionRDD(ssc.sc, oldRDDs) + } else { + logDebug("Using normal union for windowing at " + validTime) + new UnionRDD(ssc.sc, oldRDDs) + } + + val increment = oldRDD.count() + + val rddsInWindow = parent.slice(currentWindow) + val windowRDD = if (rddsInWindow.flatMap(_.partitioner).distinct.length == 1) { + logDebug("Using partition aware union for windowing at " + validTime) + new PartitionerAwareUnionRDD(ssc.sc, rddsInWindow) + } else { + logDebug("Using normal union for windowing at " + validTime) + new UnionRDD(ssc.sc, rddsInWindow) + } + + if (!windowRDD.isEmpty()) { + //println(windowRDD.count) + val first = windowRDD.first() + var shift = 0L + if (tracker.toString.toLong > 0L) { + shift = tracker.toString.toLong - increment + tracker.setValue(0L) + } + + val zippedRDD = windowRDD.zipWithIndex().map(x => (x._2, x._1)).sortByKey() + val selectedRDD = zippedRDD.filter(x => x._1 > shift) + + val prevKeyRdd = selectedRDD.map(i => { + (i._1 + 1, i._2) + }) + + val sortedprevMappedRdd = selectedRDD.leftOuterJoin(prevKeyRdd).sortByKey() + + val taggedRDD = sortedprevMappedRdd.map(x => { + var isMatch = false + var matchName = "NAN" + for (predicate <- predicatesCopy if !isMatch) { + isMatch = predicate._2(x._2._1, WindowMetric(first, x._2._2.getOrElse(first))) + if (isMatch) { + matchName = predicate._1 + } + } + // println(x._1, (matchName, x._2._1)) + (x._1, (matchName, x._2._1)) + }) + + Some(applyRegex(taggedRDD)) + } + else + None + } +} \ No newline at end of file From cca266e84f123061d186ad8d9f8fdaafc6bff236 Mon Sep 17 00:00:00 2001 From: sachin aggarwal Date: Wed, 23 Mar 2016 17:30:54 +0530 Subject: [PATCH 02/13] adding initial code for CEP --- .../apache/spark/streaming/dstream/DStream.scala | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index eb7b64eaf497..a1f497edfc1c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -20,12 +20,13 @@ package org.apache.spark.streaming.dstream import java.io.{IOException, ObjectInputStream, ObjectOutputStream} + import scala.collection.mutable.HashMap import scala.language.implicitConversions import scala.reflect.ClassTag import scala.util.matching.Regex -import org.apache.spark.{SparkContext, SparkException} +import org.apache.spark.{HashPartitioner, SparkContext, SparkException} import org.apache.spark.internal.Logging import org.apache.spark.rdd.{BlockRDD, PairRDDFunctions, RDD, RDDOperationScope} import org.apache.spark.storage.StorageLevel @@ -856,6 +857,19 @@ abstract class DStream[T: ClassTag] ( ) } + + def matchPatternByWindow( + pattern: scala.util.matching.Regex, + predicates: Map[String, (T, WindowMetric) => Boolean], + windowDuration: Duration, + slideDuration: Duration + ): DStream[List[T]] = ssc.withScope { + new PatternMatchedDStream[T]( + this, pattern, predicates, + windowDuration, slideDuration, new HashPartitioner(1) + ) + } + /** * Return a new DStream by unifying data of another DStream with this DStream. * @param that Another DStream having the same slideDuration as this DStream. From 633408de388db4decc38616327e588477e7147d8 Mon Sep 17 00:00:00 2001 From: mariobriggs Date: Wed, 30 Mar 2016 20:21:38 +0530 Subject: [PATCH 03/13] sortedPrevMappedStream created incrementally --- .../kafka/DirectKafkaStreamSuite.scala | 68 ++++++++++- .../dstream/PatternMatchedDStream.scala | 113 +++++++++++------- 2 files changed, 140 insertions(+), 41 deletions(-) diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index f14ff6705fd9..dfaecc67a84d 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -36,7 +36,7 @@ import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time} -import org.apache.spark.streaming.dstream.DStream +import org.apache.spark.streaming.dstream.{WindowMetric, DStream} import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset import org.apache.spark.streaming.scheduler._ import org.apache.spark.streaming.scheduler.rate.RateEstimator @@ -521,3 +521,69 @@ private[streaming] class ConstantRateController(id: Int, estimator: RateEstimato override def publish(rate: Long): Unit = () override def getLatestRate(): Long = rate } + +object DirectKafkaWordCount { + + import org.apache.spark.streaming._ + + def main(args: Array[String]) { + val brokers = "localhost:9092" + val topics = "stocks" + + + // Create context with 2 second batch interval + val sparkConf = new SparkConf().setMaster("local[*]").setAppName("DirectKafkaWordCount") + val ssc = new StreamingContext(sparkConf, Seconds(6)) + ssc.checkpoint("/Users/mbriggs/work/stc/projects/logs") + + // Create direct kafka stream with brokers and topics + val topicsSet = topics.split(",").toSet + val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers, + "auto.offset.reset"->"smallest") + + case class Tick(symbol: String, price: Int, ts: Long) + + // Read a stream of message from kafka + val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( + ssc, kafkaParams, topicsSet) + + // Get each message and put into a Tick object + val ticks = messages.map(_._2) + .map(_.split("[,:]")).map(p => { + Tick(p(1), p(3).trim.toInt, p(5).trim.toLong) + }) + + // define rise, drop & deep predicates + def rise(in: Tick, ew: WindowMetric): Boolean = { + in.price > ew.first.asInstanceOf[Tick].price && + in.price >= ew.prev.asInstanceOf[Tick].price + } + def drop(in: Tick, ew: WindowMetric): Boolean = { + in.price >= ew.first.asInstanceOf[Tick].price && + in.price < ew.prev.asInstanceOf[Tick].price + } + def deep(in: Tick, ew: WindowMetric): Boolean = { + in.price < ew.first.asInstanceOf[Tick].price && + in.price < ew.prev.asInstanceOf[Tick].price + } + + // map the predicates to their state names + val predicateMapping: Map[String, (Tick, WindowMetric) => Boolean] = + Map("rise" -> rise, "drop" -> drop, "deep" -> deep) + + val matches = ticks.matchPatternByWindow("rise drop [rise ]+ deep".r, + predicateMapping, Seconds(12), Seconds(6)) + + matches.print() + + + ssc.start() + ssc.awaitTermination() + + //matches.foreachRDD(_.collect().foreach( x => println("match " + x))) + + // Start the computation + + } +} + diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PatternMatchedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PatternMatchedDStream.scala index 8df4d25a7aa9..db2f8611416a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PatternMatchedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PatternMatchedDStream.scala @@ -17,6 +17,7 @@ package org.apache.spark.streaming.dstream +import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.{PartitionerAwareUnionRDD, RDD, UnionRDD} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.{Duration, Interval, Time} @@ -65,16 +66,53 @@ class PatternMatchedDStream[T: ClassTag]( "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")" ) + private def maxTStream(in: DStream[(Long, (T, Option[T]))]) : DStream[(Long, T)] = { + in.reduce((e1, e2) => { + if (e1._1 > e2._1) { + e1 + } else { + e2 + } + }).map(max => (max._1, max._2._1)) + } + + private def keyWithPreviousEvent(in: DStream[T]) : DStream[(Long, (T, Option[T]))] = { + val keyedRdd = in.transform((rdd, time) => { + val offset = System.nanoTime() + rdd.zipWithIndex().map(i => { + (i._2 + offset, i._1)}) + }) + val previousEventRdd = keyedRdd.map(x => (x._1 + 1, x._2) ) + keyedRdd.leftOuterJoin(previousEventRdd).transform(rdd => {rdd.sortByKey()} ) + } + + val sortedprevMappedStream = keyWithPreviousEvent(parent) + val maxKeyStream = maxTStream(sortedprevMappedStream) + super.persist(StorageLevel.MEMORY_ONLY_SER) + sortedprevMappedStream.persist(StorageLevel.MEMORY_ONLY_SER) + maxKeyStream.persist(StorageLevel.MEMORY_ONLY_SER) def windowDuration: Duration = _windowDuration - override def dependencies: List[DStream[_]] = List(parent) + override def dependencies: List[DStream[_]] = List(sortedprevMappedStream, maxKeyStream) override def slideDuration: Duration = _slideDuration override val mustCheckpoint = true + override def persist(storageLevel: StorageLevel): DStream[List[T]] = { + super.persist(storageLevel) + sortedprevMappedStream.persist(storageLevel) + maxKeyStream.persist(storageLevel) + this + } + + override def checkpoint(interval: Duration): DStream[List[T]] = { + super.checkpoint(interval) + this + } + override def parentRememberDuration: Duration = rememberDuration + windowDuration val applyRegex = (rdd: RDD[(Long, (String, T))]) => { @@ -85,7 +123,7 @@ class PatternMatchedDStream[T: ClassTag]( .map(x => { val it = x.iterator val builder = new scala.collection.mutable.StringBuilder() - val map = scala.collection.mutable.HashMap[Long, Tuple2[Long,T]] () + val map = scala.collection.mutable.HashMap[Long, Tuple2[Long, T]] () var curLen = 1L while (it.hasNext) { val i = it.next() @@ -93,7 +131,6 @@ class PatternMatchedDStream[T: ClassTag]( map.put(curLen, (i._1, i._2._2)) curLen += i._2._1.length + 1 } - //println(builder.toString(), map) (builder.toString(), map) }) val tracker = PatternMatchedDStream.getInstance(this.ssc.sparkContext) @@ -108,7 +145,6 @@ class PatternMatchedDStream[T: ClassTag]( one.split(" ").map(sub => { val id = y._2.get(it.start + len) list += id.get._2 - //println("id="+id.get._1) if (tracker.toString.toLong < id.get._1) { tracker.setValue(id.get._1) } @@ -116,33 +152,33 @@ class PatternMatchedDStream[T: ClassTag]( }) o += list.toList } - // o.toList.foreach(println) o.toList }) } + private def getLastMaxKey(rdds: Seq[RDD[(Long, T)]]): Option[T] = { + if (rdds.length > 0) { + (0 to rdds.size-1).map(i => { + if (!rdds(rdds.size-1-i).isEmpty()) { + return Some(rdds(rdds.size-1-i).reduce((e1, e2) => { if (e1._1 > e2._1) e1 else e2 })._2) + } + }) + } + None: Option[T] + } + override def compute(validTime: Time): Option[RDD[List[T]]] = { val predicatesCopy = predicates val patternCopy = pattern; val currentWindow = new Interval(validTime - windowDuration + parent.slideDuration, validTime) val previousWindow = currentWindow - slideDuration - val tracker = PatternMatchedDStream.getInstance(this.ssc.sparkContext) - val oldRDDs = - parent.slice(previousWindow.beginTime, currentWindow.beginTime - parent.slideDuration) + // first get the row with max key in the last window and broadcast it + val oldMaxStreamRDDs = maxKeyStream.slice(previousWindow) + val brdcst = ssc.sparkContext.broadcast(getLastMaxKey(oldMaxStreamRDDs)) - val oldRDD = if (oldRDDs.flatMap(_.partitioner).distinct.length == 1) { - logDebug("Using partition aware union for windowing at " + validTime) - new PartitionerAwareUnionRDD(ssc.sc, oldRDDs) - } else { - logDebug("Using normal union for windowing at " + validTime) - new UnionRDD(ssc.sc, oldRDDs) - } - - val increment = oldRDD.count() - - val rddsInWindow = parent.slice(currentWindow) + val rddsInWindow = sortedprevMappedStream.slice(currentWindow) val windowRDD = if (rddsInWindow.flatMap(_.partitioner).distinct.length == 1) { logDebug("Using partition aware union for windowing at " + validTime) new PartitionerAwareUnionRDD(ssc.sc, rddsInWindow) @@ -152,39 +188,36 @@ class PatternMatchedDStream[T: ClassTag]( } if (!windowRDD.isEmpty()) { - //println(windowRDD.count) val first = windowRDD.first() - var shift = 0L - if (tracker.toString.toLong > 0L) { - shift = tracker.toString.toLong - increment - tracker.setValue(0L) - } - - val zippedRDD = windowRDD.zipWithIndex().map(x => (x._2, x._1)).sortByKey() - val selectedRDD = zippedRDD.filter(x => x._1 > shift) - - val prevKeyRdd = selectedRDD.map(i => { - (i._1 + 1, i._2) - }) - - val sortedprevMappedRdd = selectedRDD.leftOuterJoin(prevKeyRdd).sortByKey() - - val taggedRDD = sortedprevMappedRdd.map(x => { + val taggedRDD = windowRDD.map(x => { var isMatch = false var matchName = "NAN" for (predicate <- predicatesCopy if !isMatch) { - isMatch = predicate._2(x._2._1, WindowMetric(first, x._2._2.getOrElse(first))) + val prev = x._2._2 match { + case Some(i) => i.asInstanceOf[T] + case None => { + if (first._1 == x._1) { // this is the first in window + first._2._1 + } + else { // this is a batchInterval boundary + brdcst.value.get + } + } + } + println("broadcast " + brdcst.value) + isMatch = predicate._2(x._2._1, WindowMetric(first._2._1, x._2._2.getOrElse(prev))) if (isMatch) { matchName = predicate._1 } } - // println(x._1, (matchName, x._2._1)) (x._1, (matchName, x._2._1)) }) Some(applyRegex(taggedRDD)) } - else + else { None + } } -} \ No newline at end of file +} + From 087bac9bf8158ff1b556166c24a039fffc5b9f3d Mon Sep 17 00:00:00 2001 From: mariobriggs Date: Thu, 31 Mar 2016 18:33:57 +0530 Subject: [PATCH 04/13] add back dynamic window --- .../dstream/PatternMatchedDStream.scala | 21 ++++++++++++++++--- 1 file changed, 18 insertions(+), 3 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PatternMatchedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PatternMatchedDStream.scala index db2f8611416a..4f2b859ab5de 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PatternMatchedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PatternMatchedDStream.scala @@ -145,8 +145,10 @@ class PatternMatchedDStream[T: ClassTag]( one.split(" ").map(sub => { val id = y._2.get(it.start + len) list += id.get._2 + println("matched " + tracker.toString) if (tracker.toString.toLong < id.get._1) { tracker.setValue(id.get._1) + println("set it") } len += sub.length + 1 }) @@ -188,8 +190,21 @@ class PatternMatchedDStream[T: ClassTag]( } if (!windowRDD.isEmpty()) { - val first = windowRDD.first() - val taggedRDD = windowRDD.map(x => { + + // we dont want to report old matched patterns in current window if any + val tracker = PatternMatchedDStream.getInstance(this.ssc.sparkContext) + println("init " + tracker.toString) + val shift = if (tracker.toString.toLong > 0L) { + val copy = tracker.toString.toLong + tracker.setValue(0L) + copy + } else { + 0L + } + val newRDD = windowRDD.filter(x => x._1 > shift) + + val first = newRDD.first() + val taggedRDD = newRDD.map(x => { var isMatch = false var matchName = "NAN" for (predicate <- predicatesCopy if !isMatch) { @@ -204,7 +219,7 @@ class PatternMatchedDStream[T: ClassTag]( } } } - println("broadcast " + brdcst.value) + //println("broadcast " + brdcst.value) isMatch = predicate._2(x._2._1, WindowMetric(first._2._1, x._2._2.getOrElse(prev))) if (isMatch) { matchName = predicate._1 From 750c9716626b1bf0213625cc7b4365399c063d81 Mon Sep 17 00:00:00 2001 From: mariobriggs Date: Thu, 31 Mar 2016 19:33:14 +0530 Subject: [PATCH 05/13] remove println & api Cleanup --- .../spark/streaming/dstream/DStream.scala | 21 +++++++++++++----- .../dstream/PatternMatchedDStream.scala | 22 ++++++++----------- 2 files changed, 24 insertions(+), 19 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index a1f497edfc1c..bf0ca455ed3c 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -858,15 +858,24 @@ abstract class DStream[T: ClassTag] ( } + /** + * + * @param pattern + * @param predicates + * @param windowDuration + * @param slideDuration + * @return + */ def matchPatternByWindow( - pattern: scala.util.matching.Regex, - predicates: Map[String, (T, WindowMetric) => Boolean], - windowDuration: Duration, - slideDuration: Duration - ): DStream[List[T]] = ssc.withScope { + pattern: scala.util.matching.Regex, + predicates: Map[String, (T, WindowMetric) => Boolean], + windowDuration: Duration, + slideDuration: Duration) + (implicit ord: Ordering[T] = null) + : DStream[List[T]] = ssc.withScope { new PatternMatchedDStream[T]( this, pattern, predicates, - windowDuration, slideDuration, new HashPartitioner(1) + windowDuration, slideDuration ) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PatternMatchedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PatternMatchedDStream.scala index 4f2b859ab5de..7a02828af777 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PatternMatchedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PatternMatchedDStream.scala @@ -48,21 +48,20 @@ case class WindowMetric(first: Any, prev: Any) {} private[streaming] class PatternMatchedDStream[T: ClassTag]( - parent: DStream[T], - pattern: scala.util.matching.Regex, - predicates: Map[String, (T, WindowMetric) => Boolean], - _windowDuration: Duration, - _slideDuration: Duration, - partitioner: Partitioner - ) extends DStream[List[T]](parent.ssc) { + parent: DStream[T], + pattern: scala.util.matching.Regex, + predicates: Map[String, (T, WindowMetric) => Boolean], + _windowDuration: Duration, + _slideDuration: Duration + ) extends DStream[List[T]](parent.ssc) { require(_windowDuration.isMultipleOf(parent.slideDuration), - "The window duration of ReducedWindowedDStream (" + _windowDuration + ") " + + "The window duration of PatternMatchedDStream (" + _windowDuration + ") " + "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")" ) require(_slideDuration.isMultipleOf(parent.slideDuration), - "The slide duration of ReducedWindowedDStream (" + _slideDuration + ") " + + "The slide duration of PatternMatchedDStream (" + _slideDuration + ") " + "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")" ) @@ -145,10 +144,8 @@ class PatternMatchedDStream[T: ClassTag]( one.split(" ").map(sub => { val id = y._2.get(it.start + len) list += id.get._2 - println("matched " + tracker.toString) if (tracker.toString.toLong < id.get._1) { tracker.setValue(id.get._1) - println("set it") } len += sub.length + 1 }) @@ -192,8 +189,8 @@ class PatternMatchedDStream[T: ClassTag]( if (!windowRDD.isEmpty()) { // we dont want to report old matched patterns in current window if any + // so skip to the ID maintained in tracker val tracker = PatternMatchedDStream.getInstance(this.ssc.sparkContext) - println("init " + tracker.toString) val shift = if (tracker.toString.toLong > 0L) { val copy = tracker.toString.toLong tracker.setValue(0L) @@ -219,7 +216,6 @@ class PatternMatchedDStream[T: ClassTag]( } } } - //println("broadcast " + brdcst.value) isMatch = predicate._2(x._2._1, WindowMetric(first._2._1, x._2._2.getOrElse(prev))) if (isMatch) { matchName = predicate._1 From 9a4d32dcc01bce30a3322fac95be3ec839dbcd59 Mon Sep 17 00:00:00 2001 From: mariobriggs Date: Thu, 31 Mar 2016 19:39:54 +0530 Subject: [PATCH 06/13] changed WindowMetric to WindowState --- .../spark/streaming/kafka/DirectKafkaStreamSuite.scala | 10 +++++----- .../org/apache/spark/streaming/dstream/DStream.scala | 2 +- .../streaming/dstream/PatternMatchedDStream.scala | 6 +++--- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index dfaecc67a84d..f85284e55efa 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -36,7 +36,7 @@ import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time} -import org.apache.spark.streaming.dstream.{WindowMetric, DStream} +import org.apache.spark.streaming.dstream.{WindowState, DStream} import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset import org.apache.spark.streaming.scheduler._ import org.apache.spark.streaming.scheduler.rate.RateEstimator @@ -554,21 +554,21 @@ object DirectKafkaWordCount { }) // define rise, drop & deep predicates - def rise(in: Tick, ew: WindowMetric): Boolean = { + def rise(in: Tick, ew: WindowState): Boolean = { in.price > ew.first.asInstanceOf[Tick].price && in.price >= ew.prev.asInstanceOf[Tick].price } - def drop(in: Tick, ew: WindowMetric): Boolean = { + def drop(in: Tick, ew: WindowState): Boolean = { in.price >= ew.first.asInstanceOf[Tick].price && in.price < ew.prev.asInstanceOf[Tick].price } - def deep(in: Tick, ew: WindowMetric): Boolean = { + def deep(in: Tick, ew: WindowState): Boolean = { in.price < ew.first.asInstanceOf[Tick].price && in.price < ew.prev.asInstanceOf[Tick].price } // map the predicates to their state names - val predicateMapping: Map[String, (Tick, WindowMetric) => Boolean] = + val predicateMapping: Map[String, (Tick, WindowState) => Boolean] = Map("rise" -> rise, "drop" -> drop, "deep" -> deep) val matches = ticks.matchPatternByWindow("rise drop [rise ]+ deep".r, diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index bf0ca455ed3c..7dfb324b249f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -868,7 +868,7 @@ abstract class DStream[T: ClassTag] ( */ def matchPatternByWindow( pattern: scala.util.matching.Regex, - predicates: Map[String, (T, WindowMetric) => Boolean], + predicates: Map[String, (T, WindowState) => Boolean], windowDuration: Duration, slideDuration: Duration) (implicit ord: Ordering[T] = null) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PatternMatchedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PatternMatchedDStream.scala index 7a02828af777..7bc5a1c33702 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PatternMatchedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PatternMatchedDStream.scala @@ -44,13 +44,13 @@ object PatternMatchedDStream { } } -case class WindowMetric(first: Any, prev: Any) {} +case class WindowState(first: Any, prev: Any) {} private[streaming] class PatternMatchedDStream[T: ClassTag]( parent: DStream[T], pattern: scala.util.matching.Regex, - predicates: Map[String, (T, WindowMetric) => Boolean], + predicates: Map[String, (T, WindowState) => Boolean], _windowDuration: Duration, _slideDuration: Duration ) extends DStream[List[T]](parent.ssc) { @@ -216,7 +216,7 @@ class PatternMatchedDStream[T: ClassTag]( } } } - isMatch = predicate._2(x._2._1, WindowMetric(first._2._1, x._2._2.getOrElse(prev))) + isMatch = predicate._2(x._2._1, WindowState(first._2._1, x._2._2.getOrElse(prev))) if (isMatch) { matchName = predicate._1 } From e433602529dfb5b84eaf0390b8324c40a6e2d7a7 Mon Sep 17 00:00:00 2001 From: mariobriggs Date: Thu, 31 Mar 2016 19:45:41 +0530 Subject: [PATCH 07/13] undo add to test suite --- .../kafka/DirectKafkaStreamSuite.scala | 68 +------------------ 1 file changed, 1 insertion(+), 67 deletions(-) diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index f85284e55efa..8a85c632340e 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -520,70 +520,4 @@ private[streaming] class ConstantRateController(id: Int, estimator: RateEstimato extends RateController(id, estimator) { override def publish(rate: Long): Unit = () override def getLatestRate(): Long = rate -} - -object DirectKafkaWordCount { - - import org.apache.spark.streaming._ - - def main(args: Array[String]) { - val brokers = "localhost:9092" - val topics = "stocks" - - - // Create context with 2 second batch interval - val sparkConf = new SparkConf().setMaster("local[*]").setAppName("DirectKafkaWordCount") - val ssc = new StreamingContext(sparkConf, Seconds(6)) - ssc.checkpoint("/Users/mbriggs/work/stc/projects/logs") - - // Create direct kafka stream with brokers and topics - val topicsSet = topics.split(",").toSet - val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers, - "auto.offset.reset"->"smallest") - - case class Tick(symbol: String, price: Int, ts: Long) - - // Read a stream of message from kafka - val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( - ssc, kafkaParams, topicsSet) - - // Get each message and put into a Tick object - val ticks = messages.map(_._2) - .map(_.split("[,:]")).map(p => { - Tick(p(1), p(3).trim.toInt, p(5).trim.toLong) - }) - - // define rise, drop & deep predicates - def rise(in: Tick, ew: WindowState): Boolean = { - in.price > ew.first.asInstanceOf[Tick].price && - in.price >= ew.prev.asInstanceOf[Tick].price - } - def drop(in: Tick, ew: WindowState): Boolean = { - in.price >= ew.first.asInstanceOf[Tick].price && - in.price < ew.prev.asInstanceOf[Tick].price - } - def deep(in: Tick, ew: WindowState): Boolean = { - in.price < ew.first.asInstanceOf[Tick].price && - in.price < ew.prev.asInstanceOf[Tick].price - } - - // map the predicates to their state names - val predicateMapping: Map[String, (Tick, WindowState) => Boolean] = - Map("rise" -> rise, "drop" -> drop, "deep" -> deep) - - val matches = ticks.matchPatternByWindow("rise drop [rise ]+ deep".r, - predicateMapping, Seconds(12), Seconds(6)) - - matches.print() - - - ssc.start() - ssc.awaitTermination() - - //matches.foreachRDD(_.collect().foreach( x => println("match " + x))) - - // Start the computation - - } -} - +} \ No newline at end of file From 5e67477f5dd187a371f1f6916fae2e75a652f4ac Mon Sep 17 00:00:00 2001 From: mariobriggs Date: Thu, 31 Mar 2016 19:47:47 +0530 Subject: [PATCH 08/13] more undo to test suite --- .../apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index 8a85c632340e..f14ff6705fd9 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -36,7 +36,7 @@ import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} import org.apache.spark.internal.Logging import org.apache.spark.rdd.RDD import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time} -import org.apache.spark.streaming.dstream.{WindowState, DStream} +import org.apache.spark.streaming.dstream.DStream import org.apache.spark.streaming.kafka.KafkaCluster.LeaderOffset import org.apache.spark.streaming.scheduler._ import org.apache.spark.streaming.scheduler.rate.RateEstimator @@ -520,4 +520,4 @@ private[streaming] class ConstantRateController(id: Int, estimator: RateEstimato extends RateController(id, estimator) { override def publish(rate: Long): Unit = () override def getLatestRate(): Long = rate -} \ No newline at end of file +} From 94410a75a279261e62a0bdd5aaf9bda74c08a022 Mon Sep 17 00:00:00 2001 From: Mario Briggs Date: Wed, 13 Apr 2016 22:58:15 +0530 Subject: [PATCH 09/13] Per part commit --- .../dstream/PatternMatchedDStream.scala | 269 ++++++++++++++---- 1 file changed, 219 insertions(+), 50 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PatternMatchedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PatternMatchedDStream.scala index 7bc5a1c33702..4615c63a99cf 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PatternMatchedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PatternMatchedDStream.scala @@ -20,14 +20,14 @@ package org.apache.spark.streaming.dstream import org.apache.spark.broadcast.Broadcast import org.apache.spark.rdd.{PartitionerAwareUnionRDD, RDD, UnionRDD} import org.apache.spark.storage.StorageLevel -import org.apache.spark.streaming.{Duration, Interval, Time} -import org.apache.spark.{Accumulator, Partitioner, SparkContext} +import org.apache.spark.streaming.{StreamingContext, Duration, Interval, Time} +import org.apache.spark.{HashPartitioner, Accumulator, Partitioner, SparkContext} import scala.reflect.ClassTag /** - * Created by agsachin on 26/02/16. - */ + * Created by agsachin on 26/02/16. + */ object PatternMatchedDStream { private var tracker: Accumulator[Long] = null @@ -48,12 +48,12 @@ case class WindowState(first: Any, prev: Any) {} private[streaming] class PatternMatchedDStream[T: ClassTag]( - parent: DStream[T], - pattern: scala.util.matching.Regex, - predicates: Map[String, (T, WindowState) => Boolean], - _windowDuration: Duration, - _slideDuration: Duration - ) extends DStream[List[T]](parent.ssc) { + parent: DStream[T], + pattern: scala.util.matching.Regex, + predicates: Map[String, (T, WindowState) => Boolean], + _windowDuration: Duration, + _slideDuration: Duration + ) extends DStream[List[T]](parent.ssc) { require(_windowDuration.isMultipleOf(parent.slideDuration), "The window duration of PatternMatchedDStream (" + _windowDuration + ") " + @@ -65,6 +65,7 @@ class PatternMatchedDStream[T: ClassTag]( "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")" ) + var brdcst = ssc.sparkContext.broadcast((0L, None: Option[T])) private def maxTStream(in: DStream[(Long, (T, Option[T]))]) : DStream[(Long, T)] = { in.reduce((e1, e2) => { if (e1._1 > e2._1) { @@ -76,13 +77,45 @@ class PatternMatchedDStream[T: ClassTag]( } private def keyWithPreviousEvent(in: DStream[T]) : DStream[(Long, (T, Option[T]))] = { - val keyedRdd = in.transform((rdd, time) => { + /* val keyedRdd = in.transform((rdd, time) => { val offset = System.nanoTime() rdd.zipWithIndex().map(i => { (i._2 + offset, i._1)}) + }) */ + val keyedRdd = in.transform((rdd, time) => { + val brdcstCopy = brdcst + val n = rdd.zipWithIndex().map(i => { + (i._2 + brdcstCopy.value._1 + 1, i._1)}) + /* n.mapPartitionsWithIndex{ (index, itr) => { + println(index + " - " + itr.toList) + itr.toList.map(x => (index, x)).iterator }} */ + n }) - val previousEventRdd = keyedRdd.map(x => (x._1 + 1, x._2) ) - keyedRdd.leftOuterJoin(previousEventRdd).transform(rdd => {rdd.sortByKey()} ) + + val previousEventRdd = keyedRdd.map(x => (x._1 + 1, x._2)) + val o = keyedRdd.leftOuterJoin(previousEventRdd, 3).transform(rdd => rdd.sortByKey()) + .transform(rdd => { + val brdcstCopy = brdcst + rdd.map(x => { + if (x._2._2 == None) { + (x._1, (x._2._1, brdcstCopy.value._2)) + } + else { + x + } + }) + }) + /* .map(x => { + if (x._2._2 == None) { + println("joined broadcast = " + brdcstCopy.value._2) + (x._1, (x._2._1, brdcstCopy.value._2)) + } + else { + x + } + })*/ + //o.map(x => { println(x) ; x}) + o } val sortedprevMappedStream = keyWithPreviousEvent(parent) @@ -114,56 +147,151 @@ class PatternMatchedDStream[T: ClassTag]( override def parentRememberDuration: Duration = rememberDuration + windowDuration + val applyRegex = (rdd: RDD[(Long, (String, T))]) => { val patternCopy = pattern import scala.collection.mutable.ListBuffer - val stream = rdd.groupBy(_ => "all") - .map(_._2) - .map(x => { - val it = x.iterator - val builder = new scala.collection.mutable.StringBuilder() - val map = scala.collection.mutable.HashMap[Long, Tuple2[Long, T]] () - var curLen = 1L - while (it.hasNext) { - val i = it.next() - builder.append(" " + i._2._1) - map.put(curLen, (i._1, i._2._2)) - curLen += i._2._1.length + 1 - } - (builder.toString(), map) - }) - val tracker = PatternMatchedDStream.getInstance(this.ssc.sparkContext) - - stream.flatMap(y => { - val it = patternCopy.findAllIn(y._1) - val o = ListBuffer[List[T]] () + // var ctr = -1 + // val t = rdd.groupBy(_ => { ctr = ctr + 1; ctr % 3 } ) + /* val builder = new scala.collection.mutable.StringBuilder() + val map = scala.collection.mutable.HashMap[Long, Tuple2[Long, T]]() + var curLen = 1L + val matcher = rdd.map(x => { + builder.append(" " + x._2._1) + map.put(curLen, (x._1, x._2._2)) + curLen += x._2._1.length + 1 + (x._1, x._2._2) + }) */ + + + val temp = rdd.mapPartitionsWithIndex{ (index, itr) => itr.toList.map( + x => (index, x)).iterator } + val zero = new Aggregator[T]() + val t = temp.aggregateByKey(zero)( + (set, v) => set.append(v._2._1, v._2._2), + (set1, set2) => set1 ++= set2) + + t.flatMap(x => { + //println(x._2.builder.toString()) + val it = patternCopy.findAllIn(x._2.builder.toString()) + val o = ListBuffer[List[T]]() for (one <- it) { var len = 0 var ctr = 0 - val list = ListBuffer[T] () + val list = ListBuffer[T]() one.split(" ").map(sub => { - val id = y._2.get(it.start + len) - list += id.get._2 - if (tracker.toString.toLong < id.get._1) { - tracker.setValue(id.get._1) - } + val id = x._2.map.get(it.start + len) + list += id.get len += sub.length + 1 }) o += list.toList } o.toList }) + + + /*val it = patternCopy.findAllIn(builder.toString()) + val o = ListBuffer[List[Long]]() + for (one <- it) { + + var len = 0 + var ctr = 0 + val list = ListBuffer[Long]() + one.split(" ").map(sub => { + val id = map.get(it.start + len) + // list += id.get._2 + list += id.get._1 + len += sub.length + 1 + }) + o += list.toList + } + + val l = o.toList */ + + /* var b = false + rdd.flatMap(x => { + if (!b) { + b = true + val it = patternCopy.findAllIn(builder.toString()) + val o = ListBuffer[List[T]]() + for (one <- it) { + //println(one) + var len = 0 + var ctr = 0 + val list = ListBuffer[T]() + one.split(" ").map(sub => { + val id = map.get(it.start + len) + list += id.get._2 + len += sub.length + 1 + }) + o += list.toList + } + + //println(o.toList) + o.toList + } + else { + List(List()) + } + }) */ } - private def getLastMaxKey(rdds: Seq[RDD[(Long, T)]]): Option[T] = { + + + /* val stream = rdd.groupBy(_ => "all") + /*val f1 = (rd: (Long, (String, T))) => { + rd._1 + } + val stream = rdd.groupBy(f1, new PatternMatchPartitioner(rdd.partitioner.get)) */ + .map(_._2) + .map(x => { + val it = x.iterator + val builder = new scala.collection.mutable.StringBuilder() + val map = scala.collection.mutable.HashMap[Long, Tuple2[Long, T]] () + var curLen = 1L + while (it.hasNext) { + val i = it.next() + builder.append(" " + i._2._1) + map.put(curLen, (i._1, i._2._2)) + curLen += i._2._1.length + 1 + } + (builder.toString(), map) + }) + val tracker = PatternMatchedDStream.getInstance(this.ssc.sparkContext) + + stream.flatMap(y => { + val it = patternCopy.findAllIn(y._1) + val o = ListBuffer[List[T]] () + for (one <- it) { + var len = 0 + var ctr = 0 + val list = ListBuffer[T] () + one.split(" ").map(sub => { + val id = y._2.get(it.start + len) + list += id.get._2 + if (tracker.toString.toLong < id.get._1) { + tracker.setValue(id.get._1) + } + len += sub.length + 1 + }) + o += list.toList + } + o.toList + }) + } */ + + private def getLastMaxKey(rdds: Seq[RDD[(Long, T)]]): (Long, Option[T]) = { if (rdds.length > 0) { (0 to rdds.size-1).map(i => { if (!rdds(rdds.size-1-i).isEmpty()) { - return Some(rdds(rdds.size-1-i).reduce((e1, e2) => { if (e1._1 > e2._1) e1 else e2 })._2) + // return rdds(rdds.size-1-i).reduce((e1, e2) => { if (e1._1 > e2._1) e1 else e2 }) + val r = rdds(rdds.size-1-i).reduce((e1, e2) => { if (e1._1 > e2._1) e1 else e2 }) + return (r._1, Some(r._2)) } }) } - None: Option[T] + (0L, None: Option[T]) + //None: Option[(Long, T)] } override def compute(validTime: Time): Option[RDD[List[T]]] = { @@ -175,9 +303,11 @@ class PatternMatchedDStream[T: ClassTag]( // first get the row with max key in the last window and broadcast it val oldMaxStreamRDDs = maxKeyStream.slice(previousWindow) - val brdcst = ssc.sparkContext.broadcast(getLastMaxKey(oldMaxStreamRDDs)) + //val brdcst = ssc.sparkContext.broadcast(getLastMaxKey(oldMaxStreamRDDs)) + brdcst = ssc.sparkContext.broadcast(getLastMaxKey(oldMaxStreamRDDs)) val rddsInWindow = sortedprevMappedStream.slice(currentWindow) + val windowRDD = if (rddsInWindow.flatMap(_.partitioner).distinct.length == 1) { logDebug("Using partition aware union for windowing at " + validTime) new PartitionerAwareUnionRDD(ssc.sc, rddsInWindow) @@ -187,7 +317,6 @@ class PatternMatchedDStream[T: ClassTag]( } if (!windowRDD.isEmpty()) { - // we dont want to report old matched patterns in current window if any // so skip to the ID maintained in tracker val tracker = PatternMatchedDStream.getInstance(this.ssc.sparkContext) @@ -198,14 +327,14 @@ class PatternMatchedDStream[T: ClassTag]( } else { 0L } - val newRDD = windowRDD.filter(x => x._1 > shift) + //val newRDD = windowRDD.filter(x => x._1 > shift) - val first = newRDD.first() - val taggedRDD = newRDD.map(x => { + val first = windowRDD.first() + val taggedRDD = windowRDD.map(x => { var isMatch = false var matchName = "NAN" for (predicate <- predicatesCopy if !isMatch) { - val prev = x._2._2 match { + /* val prev = x._2._2 match { case Some(i) => i.asInstanceOf[T] case None => { if (first._1 == x._1) { // this is the first in window @@ -216,14 +345,14 @@ class PatternMatchedDStream[T: ClassTag]( } } } - isMatch = predicate._2(x._2._1, WindowState(first._2._1, x._2._2.getOrElse(prev))) + isMatch = predicate._2(x._2._1, WindowState(first._2._1, x._2._2.getOrElse(prev))) */ + isMatch = predicate._2(x._2._1, WindowState(first._2._1, x._2._2.getOrElse(first._2._1))) if (isMatch) { matchName = predicate._1 } } (x._1, (matchName, x._2._1)) }) - Some(applyRegex(taggedRDD)) } else { @@ -232,3 +361,43 @@ class PatternMatchedDStream[T: ClassTag]( } } +/* +class PatternMatchPartitioner(parent: Partitioner) extends Partitioner { + + override def numPartitions: Int = parent.numPartitions + override def getPartition(key: Any): Int = { + val ret = parent.getPartition(key) + println( key + " : " + ret ) + ret + } + // Java equals method to let Spark compare our Partitioner objects + override def equals(other: Any): Boolean = other match { + case dnp: PatternMatchPartitioner => + dnp.numPartitions == numPartitions + case _ => + false + } +} +*/ + +class Aggregator[T: ClassTag](val builder: scala.collection.mutable.StringBuilder, + val map: scala.collection.mutable.HashMap[Long, T] ) extends Serializable { + + var curLen = 1L + + def this() = { this( new scala.collection.mutable.StringBuilder(), + scala.collection.mutable.HashMap[Long, T]() ) } + + def append(in: String, elem: T ): Aggregator[T] = { + builder.append(" " + in) + map.put(curLen, elem) + curLen += in.length + 1 + this + } + + def ++=(other: Aggregator[T]): Aggregator[T] = { + builder.append(other.builder.toString()) + map ++= other.map + new Aggregator(builder, map) + } +} \ No newline at end of file From b16f1389f5c64964e12bb5d29b5f2ce09428cc61 Mon Sep 17 00:00:00 2001 From: mariobriggs Date: Wed, 20 Apr 2016 00:04:24 +0530 Subject: [PATCH 10/13] Support on PairDStream to enable pattern matching by key --- .../spark/streaming/dstream/DStream.scala | 22 -- .../dstream/PairDStreamFunctions.scala | 97 ++++++ .../dstream/PatternMatchedDStream.scala | 304 ++++++------------ 3 files changed, 199 insertions(+), 224 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index 7dfb324b249f..60a7e20a6a8a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -857,28 +857,6 @@ abstract class DStream[T: ClassTag] ( ) } - - /** - * - * @param pattern - * @param predicates - * @param windowDuration - * @param slideDuration - * @return - */ - def matchPatternByWindow( - pattern: scala.util.matching.Regex, - predicates: Map[String, (T, WindowState) => Boolean], - windowDuration: Duration, - slideDuration: Duration) - (implicit ord: Ordering[T] = null) - : DStream[List[T]] = ssc.withScope { - new PatternMatchedDStream[T]( - this, pattern, predicates, - windowDuration, slideDuration - ) - } - /** * Return a new DStream by unifying data of another DStream with this DStream. * @param that Another DStream having the same slideDuration as this DStream. diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala index d6ff96e1fc69..e7d9c7a4f94a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala @@ -350,6 +350,103 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)]) ) } + /** + * Return a new DStream that contains the list of individual events that matched the specified + * pattern over a sliding window. Empty DStream is returned if no matches found. The pattern + * search is for events partitioned by the Key K. + * 1. The pattern is a regular expression over the predicate function names + * (keys in the predicates map). + * 2. Each predicate function (value in the predicates map) receives the current event along + * with a WindowState variable that contains the first and immediately prior event in the + * window. It returns a boolean to indicate if the current event matched the predicate condition. + * 3. For each event, the predicates map is looped through and each predicate function invoked + * till first one returns true + * + * @param pattern regular expression that represents the composite event being searched for + * @param predicates map of individual predicate names to predicates functions. The pattern + * parameter is a regular expression over these names. + * @param valuesOrderedBy function that returns the attribute which orders the events within a + * key + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + */ + def patternMatchByKeyAndWindow( + pattern: scala.util.matching.Regex, + predicates: Map[String, (V, WindowState) => Boolean], + valuesOrderedBy: V => Long, // TODO return type to be made a generic type + windowDuration: Duration, + slideDuration: Duration, + numPartitions: Int = ssc.sc.defaultParallelism + ): DStream[(K, List[V])] = ssc.withScope { + patternMatchByKeyAndWindow(pattern, predicates, valuesOrderedBy, + windowDuration, slideDuration, defaultPartitioner(numPartitions) + ) + } + + def patternMatchByKeyAndWindow( + pattern: scala.util.matching.Regex, + predicates: Map[String, (V, WindowState) => Boolean], + valuesOrderedBy: V => Long, // TODO return type to be made a generic type + windowDuration: Duration, + slideDuration: Duration, + partitioner: Partitioner + ): DStream[(K, List[V])] = ssc.withScope { + new KeyedPatternMatchDStream[K, V]( + self, pattern, predicates, valuesOrderedBy, + windowDuration, slideDuration, partitioner + ) + } + + /** + * Return a new DStream that contains the list of individual events that matched the specified + * pattern over a sliding window. Empty DStream is returned if no matches found. The pattern + * search is ordered over the Key K. + * 1. The pattern is a regular expression over the predicate function names + * (keys in the predicates map). + * 2. Each predicate function (value in the predicates map) receives the current event + * along with a WindowState variable that contains the first and immediately prior event + * in the window. It returns a boolean to indicate if the current event matched the predicate + * condition. + * 3. For each event, the predicates map is looped through and each predicate function invoked + * till first one returns true + * + * @param pattern regular expression that represents the composite event being searched for + * @param predicates map of individual predicate names to predicates functions. The pattern + * parameter is a regular expression over these names. + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + + */ + def patternMatchByWindow( + pattern: scala.util.matching.Regex, + predicates: Map[String, (V, WindowState) => Boolean], + windowDuration: Duration, + slideDuration: Duration, + numPartitions: Int = ssc.sc.defaultParallelism + ): DStream[List[V]] = ssc.withScope { + patternMatchByWindow(pattern, predicates, + windowDuration, slideDuration, defaultPartitioner(numPartitions) + ) + } + + def patternMatchByWindow( + pattern: scala.util.matching.Regex, + predicates: Map[String, (V, WindowState) => Boolean], + windowDuration: Duration, + slideDuration: Duration, + partitioner: Partitioner + ): DStream[List[V]] = ssc.withScope { + new PatternMatchedDStream[K, V]( + self, pattern, predicates, + windowDuration, slideDuration, partitioner + ) + } /** * :: Experimental :: * Return a [[MapWithStateDStream]] by applying a function to every key-value element of diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PatternMatchedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PatternMatchedDStream.scala index 4615c63a99cf..b16abed7718e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PatternMatchedDStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PatternMatchedDStream.scala @@ -47,13 +47,15 @@ object PatternMatchedDStream { case class WindowState(first: Any, prev: Any) {} private[streaming] -class PatternMatchedDStream[T: ClassTag]( - parent: DStream[T], - pattern: scala.util.matching.Regex, - predicates: Map[String, (T, WindowState) => Boolean], - _windowDuration: Duration, - _slideDuration: Duration - ) extends DStream[List[T]](parent.ssc) { +class PatternMatchedDStream[K: ClassTag, V: ClassTag]( + parent: DStream[(K, V)], + pattern: scala.util.matching.Regex, + predicates: Map[String, (V, WindowState) => Boolean], + _windowDuration: Duration, + _slideDuration: Duration, + partitioner: Partitioner) + (implicit ord: Ordering[K] + ) extends DStream[List[V]](parent.ssc) { require(_windowDuration.isMultipleOf(parent.slideDuration), "The window duration of PatternMatchedDStream (" + _windowDuration + ") " + @@ -65,236 +67,161 @@ class PatternMatchedDStream[T: ClassTag]( "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")" ) - var brdcst = ssc.sparkContext.broadcast((0L, None: Option[T])) - private def maxTStream(in: DStream[(Long, (T, Option[T]))]) : DStream[(Long, T)] = { - in.reduce((e1, e2) => { - if (e1._1 > e2._1) { - e1 - } else { - e2 - } - }).map(max => (max._1, max._2._1)) - } + var brdcst = ssc.sparkContext.broadcast((0L, None: Option[V])) - private def keyWithPreviousEvent(in: DStream[T]) : DStream[(Long, (T, Option[T]))] = { - /* val keyedRdd = in.transform((rdd, time) => { - val offset = System.nanoTime() - rdd.zipWithIndex().map(i => { - (i._2 + offset, i._1)}) - }) */ - val keyedRdd = in.transform((rdd, time) => { + private def keyWithPreviousEvent(in: DStream[(K, V)]) : DStream[(Long, (V, Option[V]))] = { + val sortedStream = in.transform(rdd => rdd.sortByKey(true, rdd.partitions.size)) + .map{case(k, v) => v} + val keyedRdd = sortedStream.transform((rdd, time) => { val brdcstCopy = brdcst - val n = rdd.zipWithIndex().map(i => { + rdd.zipWithIndex().map(i => { (i._2 + brdcstCopy.value._1 + 1, i._1)}) - /* n.mapPartitionsWithIndex{ (index, itr) => { - println(index + " - " + itr.toList) - itr.toList.map(x => (index, x)).iterator }} */ - n }) - val previousEventRdd = keyedRdd.map(x => (x._1 + 1, x._2)) - val o = keyedRdd.leftOuterJoin(previousEventRdd, 3).transform(rdd => rdd.sortByKey()) + val previousEventRdd = keyedRdd.map(x => { (x._1 + 1, x._2)}) + keyedRdd.leftOuterJoin(previousEventRdd, partitioner).transform(rdd => rdd.sortByKey()) .transform(rdd => { val brdcstCopy = brdcst rdd.map(x => { if (x._2._2 == None) { (x._1, (x._2._1, brdcstCopy.value._2)) } - else { - x - } + else { x } }) }) - /* .map(x => { - if (x._2._2 == None) { - println("joined broadcast = " + brdcstCopy.value._2) - (x._1, (x._2._1, brdcstCopy.value._2)) - } - else { - x - } - })*/ - //o.map(x => { println(x) ; x}) - o } - val sortedprevMappedStream = keyWithPreviousEvent(parent) - val maxKeyStream = maxTStream(sortedprevMappedStream) + + private def getMaxIdAndElemStream(in: DStream[(Long, (V, Option[V]))]) : DStream[(Long, V)] = { + in.reduce((e1, e2) => { + if (e1._1 > e2._1) { + e1 + } else { + e2 + } + }).map(max => (max._1, max._2._1)) + } + val maxIdAndElemStream = getMaxIdAndElemStream(sortedprevMappedStream) super.persist(StorageLevel.MEMORY_ONLY_SER) sortedprevMappedStream.persist(StorageLevel.MEMORY_ONLY_SER) - maxKeyStream.persist(StorageLevel.MEMORY_ONLY_SER) + maxIdAndElemStream.persist(StorageLevel.MEMORY_ONLY_SER) def windowDuration: Duration = _windowDuration - override def dependencies: List[DStream[_]] = List(sortedprevMappedStream, maxKeyStream) + override def dependencies: List[DStream[_]] = List(sortedprevMappedStream, maxIdAndElemStream) override def slideDuration: Duration = _slideDuration override val mustCheckpoint = true - override def persist(storageLevel: StorageLevel): DStream[List[T]] = { + override def persist(storageLevel: StorageLevel): DStream[List[V]] = { super.persist(storageLevel) sortedprevMappedStream.persist(storageLevel) - maxKeyStream.persist(storageLevel) + maxIdAndElemStream.persist(storageLevel) this } - override def checkpoint(interval: Duration): DStream[List[T]] = { + override def checkpoint(interval: Duration): DStream[List[V]] = { super.checkpoint(interval) this } override def parentRememberDuration: Duration = rememberDuration + windowDuration + import scala.collection.mutable.ListBuffer - val applyRegex = (rdd: RDD[(Long, (String, T))]) => { + /** + * This algo applies the regex within each partition rather than globally. + * Benefits may include better latency when NO matches across partition boundaries. + * However to account for the latter, need a separate algo. + */ + val applyRegexPerPartition = (rdd: RDD[(Long, (String, V))]) => { val patternCopy = pattern - import scala.collection.mutable.ListBuffer - // var ctr = -1 - // val t = rdd.groupBy(_ => { ctr = ctr + 1; ctr % 3 } ) - /* val builder = new scala.collection.mutable.StringBuilder() - val map = scala.collection.mutable.HashMap[Long, Tuple2[Long, T]]() - var curLen = 1L - val matcher = rdd.map(x => { - builder.append(" " + x._2._1) - map.put(curLen, (x._1, x._2._2)) - curLen += x._2._1.length + 1 - (x._1, x._2._2) - }) */ - - val temp = rdd.mapPartitionsWithIndex{ (index, itr) => itr.toList.map( - x => (index, x)).iterator } - val zero = new Aggregator[T]() + x => (index, x)).iterator } + val zero = new Aggregator[V]() val t = temp.aggregateByKey(zero)( (set, v) => set.append(v._2._1, v._2._2), (set1, set2) => set1 ++= set2) t.flatMap(x => { - //println(x._2.builder.toString()) val it = patternCopy.findAllIn(x._2.builder.toString()) - val o = ListBuffer[List[T]]() + val o = ListBuffer[List[V]]() for (one <- it) { var len = 0 var ctr = 0 - val list = ListBuffer[T]() + val list = ListBuffer[V]() one.split(" ").map(sub => { val id = x._2.map.get(it.start + len) list += id.get + // Todo not implemented window break when match found. len += sub.length + 1 }) o += list.toList } o.toList }) + } - - /*val it = patternCopy.findAllIn(builder.toString()) - val o = ListBuffer[List[Long]]() - for (one <- it) { - - var len = 0 - var ctr = 0 - val list = ListBuffer[Long]() - one.split(" ").map(sub => { - val id = map.get(it.start + len) - // list += id.get._2 - list += id.get._1 - len += sub.length + 1 - }) - o += list.toList - } - - val l = o.toList */ - - /* var b = false - rdd.flatMap(x => { - if (!b) { - b = true - val it = patternCopy.findAllIn(builder.toString()) - val o = ListBuffer[List[T]]() - for (one <- it) { - //println(one) - var len = 0 - var ctr = 0 - val list = ListBuffer[T]() - one.split(" ").map(sub => { - val id = map.get(it.start + len) - list += id.get._2 - len += sub.length + 1 - }) - o += list.toList + /** + * This algo applies the regex globally by simply moving ALL + * data to a single partition. + */ + val applyRegex = (rdd: RDD[(Long, (String, V))]) => { + val patternCopy = pattern + val stream = rdd.groupBy(_ => "all") + .map(_._2) + .map(x => { + val it = x.iterator + val builder = new scala.collection.mutable.StringBuilder() + val map = scala.collection.mutable.HashMap[Long, Tuple2[Long, V]] () + var curLen = 1L + while (it.hasNext) { + val i = it.next() + builder.append(" " + i._2._1) + map.put(curLen, (i._1, i._2._2)) + curLen += i._2._1.length + 1 } + (builder.toString(), map) + }) + val tracker = PatternMatchedDStream.getInstance(this.ssc.sparkContext) - //println(o.toList) - o.toList - } - else { - List(List()) + stream.flatMap(y => { + val it = patternCopy.findAllIn(y._1) + val o = ListBuffer[List[V]] () + for (one <- it) { + var len = 0 + var ctr = 0 + val list = ListBuffer[V] () + one.split(" ").map(sub => { + val id = y._2.get(it.start + len) + list += id.get._2 + // publish last seen match, so that dont repeat in a sliding window + if (tracker.toString.toLong < id.get._1) { + tracker.setValue(id.get._1) + } + len += sub.length + 1 + }) + o += list.toList } - }) */ + o.toList + }) } - - - /* val stream = rdd.groupBy(_ => "all") - /*val f1 = (rd: (Long, (String, T))) => { - rd._1 - } - val stream = rdd.groupBy(f1, new PatternMatchPartitioner(rdd.partitioner.get)) */ - .map(_._2) - .map(x => { - val it = x.iterator - val builder = new scala.collection.mutable.StringBuilder() - val map = scala.collection.mutable.HashMap[Long, Tuple2[Long, T]] () - var curLen = 1L - while (it.hasNext) { - val i = it.next() - builder.append(" " + i._2._1) - map.put(curLen, (i._1, i._2._2)) - curLen += i._2._1.length + 1 - } - (builder.toString(), map) - }) - val tracker = PatternMatchedDStream.getInstance(this.ssc.sparkContext) - - stream.flatMap(y => { - val it = patternCopy.findAllIn(y._1) - val o = ListBuffer[List[T]] () - for (one <- it) { - var len = 0 - var ctr = 0 - val list = ListBuffer[T] () - one.split(" ").map(sub => { - val id = y._2.get(it.start + len) - list += id.get._2 - if (tracker.toString.toLong < id.get._1) { - tracker.setValue(id.get._1) - } - len += sub.length + 1 - }) - o += list.toList - } - o.toList - }) - } */ - - private def getLastMaxKey(rdds: Seq[RDD[(Long, T)]]): (Long, Option[T]) = { + private def getLastMaxIdAndElem(rdds: Seq[RDD[(Long, V)]]): (Long, Option[V]) = { if (rdds.length > 0) { (0 to rdds.size-1).map(i => { if (!rdds(rdds.size-1-i).isEmpty()) { - // return rdds(rdds.size-1-i).reduce((e1, e2) => { if (e1._1 > e2._1) e1 else e2 }) val r = rdds(rdds.size-1-i).reduce((e1, e2) => { if (e1._1 > e2._1) e1 else e2 }) return (r._1, Some(r._2)) } }) } - (0L, None: Option[T]) - //None: Option[(Long, T)] + (0L, None: Option[V]) } - override def compute(validTime: Time): Option[RDD[List[T]]] = { + override def compute(validTime: Time): Option[RDD[List[V]]] = { val predicatesCopy = predicates val patternCopy = pattern; val currentWindow = new Interval(validTime - windowDuration + parent.slideDuration, validTime) @@ -302,9 +229,9 @@ class PatternMatchedDStream[T: ClassTag]( // first get the row with max key in the last window and broadcast it - val oldMaxStreamRDDs = maxKeyStream.slice(previousWindow) - //val brdcst = ssc.sparkContext.broadcast(getLastMaxKey(oldMaxStreamRDDs)) - brdcst = ssc.sparkContext.broadcast(getLastMaxKey(oldMaxStreamRDDs)) + // IMP: Dont touch current window till after this broadcast + val oldMaxStreamRDDs = maxIdAndElemStream.slice(previousWindow) + brdcst = ssc.sparkContext.broadcast(getLastMaxIdAndElem(oldMaxStreamRDDs)) val rddsInWindow = sortedprevMappedStream.slice(currentWindow) @@ -327,25 +254,14 @@ class PatternMatchedDStream[T: ClassTag]( } else { 0L } - //val newRDD = windowRDD.filter(x => x._1 > shift) + val newRDD = windowRDD.filter(x => x._1 > shift) - val first = windowRDD.first() - val taggedRDD = windowRDD.map(x => { + // loop through and get the predicate string for each event. + val first = newRDD.first() + val taggedRDD = newRDD.map(x => { var isMatch = false var matchName = "NAN" for (predicate <- predicatesCopy if !isMatch) { - /* val prev = x._2._2 match { - case Some(i) => i.asInstanceOf[T] - case None => { - if (first._1 == x._1) { // this is the first in window - first._2._1 - } - else { // this is a batchInterval boundary - brdcst.value.get - } - } - } - isMatch = predicate._2(x._2._1, WindowState(first._2._1, x._2._2.getOrElse(prev))) */ isMatch = predicate._2(x._2._1, WindowState(first._2._1, x._2._2.getOrElse(first._2._1))) if (isMatch) { matchName = predicate._1 @@ -361,27 +277,11 @@ class PatternMatchedDStream[T: ClassTag]( } } -/* -class PatternMatchPartitioner(parent: Partitioner) extends Partitioner { - - override def numPartitions: Int = parent.numPartitions - override def getPartition(key: Any): Int = { - val ret = parent.getPartition(key) - println( key + " : " + ret ) - ret - } - // Java equals method to let Spark compare our Partitioner objects - override def equals(other: Any): Boolean = other match { - case dnp: PatternMatchPartitioner => - dnp.numPartitions == numPartitions - case _ => - false - } -} -*/ - -class Aggregator[T: ClassTag](val builder: scala.collection.mutable.StringBuilder, - val map: scala.collection.mutable.HashMap[Long, T] ) extends Serializable { +private[streaming] +class Aggregator[T: ClassTag]( + val builder: scala.collection.mutable.StringBuilder, + val map: scala.collection.mutable.HashMap[Long, T] + ) extends Serializable { var curLen = 1L From 8e5a5b28b3872f4f6620eb5bbf86aab84d83bfcd Mon Sep 17 00:00:00 2001 From: mariobriggs Date: Wed, 20 Apr 2016 00:13:21 +0530 Subject: [PATCH 11/13] missed file KeyedPatternMatchDStrean --- .../dstream/KeyedPatternMatchDStream.scala | 214 ++++++++++++++++++ 1 file changed, 214 insertions(+) create mode 100644 streaming/src/main/scala/org/apache/spark/streaming/dstream/KeyedPatternMatchDStream.scala diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/KeyedPatternMatchDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/KeyedPatternMatchDStream.scala new file mode 100644 index 000000000000..23ba383858e5 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/KeyedPatternMatchDStream.scala @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.dstream + +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.rdd.{CoGroupedRDD, PartitionerAwareUnionRDD, RDD, UnionRDD} +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{Duration, Interval, Time} +import org.apache.spark.{Accumulator, Partitioner, SparkContext} + + + +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + + + +private[streaming] +class KeyedPatternMatchDStream[K: ClassTag, V: ClassTag]( + parent: DStream[(K, V)], + pattern: scala.util.matching.Regex, + predicates: Map[String, (V, WindowState) => Boolean], + valuesOrderedBy: V => Long, + _windowDuration: Duration, + _slideDuration: Duration, + partitioner: Partitioner + ) extends DStream[(K, List[V])](parent.ssc) { + + require(_windowDuration.isMultipleOf(parent.slideDuration), + "The window duration of PatternMatchedDStream (" + _windowDuration + ") " + + "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")" + ) + + require(_slideDuration.isMultipleOf(parent.slideDuration), + "The slide duration of PatternMatchedDStream (" + _slideDuration + ") " + + "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")" + ) + + + private def aggregateByKey(in: DStream[(K, V)]) : DStream[(K, KeyedWithPrev[V])] = { + val aggregated = in.transform(rdd => { + val zero = new KeyedAggregator[V]() + rdd.aggregateByKey(zero)( + (set, v) => set.append(v), + (set1, set2) => set1 ++= set2) + }) + aggregated.transform(rdd => { + val valuesOrderedByCopy = valuesOrderedBy + rdd.mapValues(x => { + x.sortedWithPrev(valuesOrderedByCopy) + }) + }) + } + + val keyedWithPrevStream = aggregateByKey(parent) + + super.persist(StorageLevel.MEMORY_ONLY_SER) + keyedWithPrevStream.persist(StorageLevel.MEMORY_ONLY_SER) + + def windowDuration: Duration = _windowDuration + + override def dependencies: List[DStream[_]] = List(keyedWithPrevStream) + + override def slideDuration: Duration = _slideDuration + + override val mustCheckpoint = true + + override def persist(storageLevel: StorageLevel): DStream[(K, List[V])] = { + super.persist(storageLevel) + keyedWithPrevStream.persist(storageLevel) + this + } + + override def checkpoint(interval: Duration): DStream[(K, List[V])] = { + super.checkpoint(interval) + this + } + + override def parentRememberDuration: Duration = rememberDuration + windowDuration + + override def compute(validTime: Time): Option[RDD[(K, List[V])]] = { + + val predicatesCopy = predicates + val patternCopy = pattern; + val currentWindow = new Interval(validTime - windowDuration + parent.slideDuration, validTime) + val previousWindow = currentWindow - slideDuration + + val newRDDs = keyedWithPrevStream.slice(previousWindow.endTime + parent.slideDuration, + currentWindow.endTime) + + val lastValidRDDs = + keyedWithPrevStream.slice(currentWindow.beginTime, previousWindow.endTime) + + + // Make the list of RDDs that needs to cogrouped together for pattern matching + val allRDDs = new ArrayBuffer[RDD[(K, KeyedWithPrev[V])]]() ++= lastValidRDDs ++= newRDDs + + + // Cogroup the reduced RDDs and merge the reduced values + val cogroupedRDD = new CoGroupedRDD[K](allRDDs.toSeq.asInstanceOf[Seq[RDD[(K, _)]]], + partitioner) + + val totalValues = lastValidRDDs.size + newRDDs.size + + val mergeValues = (arrayOfValues: + Array[org.apache.spark.util.collection.CompactBuffer[KeyedWithPrev[V]]]) => { + + val compactBuffervalues = + (0 to totalValues-1).map(i => arrayOfValues(i)).filter(!_.isEmpty).map(_.head) + val values = compactBuffervalues.flatMap(y => y.list) + + import scala.collection.mutable.ListBuffer + if (!values.isEmpty) { + val minVal = values(0) + val pattern = values.map(x => { + var isMatch = false + var matchName = "NAN" + for (predicate <- predicatesCopy if !isMatch) { + isMatch = predicate._2(x._1, WindowState(minVal._1, x._2.getOrElse(minVal._1))) + if (isMatch) { + matchName = predicate._1 + } + } + (matchName, x._1) + }) + + val builder = new scala.collection.mutable.StringBuilder() + val map = scala.collection.mutable.HashMap[Long, V]() + var curLen = 1L + for (i <- pattern.iterator) { + builder.append(" " + i._1) + map.put(curLen, i._2) + curLen += i._1.length + 1 + } + + val matchIt = patternCopy.findAllIn(builder.toString()) + val o = ListBuffer[List[V]]() + for (one <- matchIt) { + var len = 0 + var ctr = 0 + val list = ListBuffer[V]() + one.split(" ").map(sub => { + val id = map.get(matchIt.start + len) + list += id.get + len += sub.length + 1 + }) + o += list.toList + } + o.toList.asInstanceOf[List[V]] + } + else { + ListBuffer[List[V]]().toList.asInstanceOf[List[V]] + } + } + + val mergedValuesRDD = cogroupedRDD.asInstanceOf[RDD[(K, + Array[org.apache.spark.util.collection.CompactBuffer[KeyedWithPrev[V]]])]] + .mapValues(mergeValues) + + Some(mergedValuesRDD.filter{case(k, v) => !v.isEmpty }) + } +} + +import scala.collection.mutable.ListBuffer + +private[streaming] +case class KeyedWithPrev[V: ClassTag](val list: List[(V, Option[V])]) + +private[streaming] +class KeyedAggregator[V: ClassTag](val list: ListBuffer[V]) extends Serializable { + + def this() = { this(ListBuffer[V]() ) } + + def append(in: V): KeyedAggregator[V] = { + list += in + this + } + + def ++=(other: KeyedAggregator[V]): KeyedAggregator[V] = { + new KeyedAggregator(list ++= other.list) + } + + def sortedWithPrev(f: V => Long) : KeyedWithPrev[V] = { + val sortedList = list.sortBy(e => f(e)) + val it = sortedList.iterator + + var start = true + val n = sortedList.map(x => { + (x, if (start) { + start = false + None: Option[V] + } else { + Some(it.next()) + }) + }) + KeyedWithPrev[V](n.toList) + } +} + From 162ffbe39e41e958bbf31bc68e0f5cbe2d68ec78 Mon Sep 17 00:00:00 2001 From: mariobriggs Date: Wed, 20 Apr 2016 10:11:19 +0530 Subject: [PATCH 12/13] CEP samples using DirectKafkaInputDStream --- .../kafka/DirectKafkaStreamSuite.scala | 144 ++++++++++++++++++ 1 file changed, 144 insertions(+) diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index f14ff6705fd9..cfe3c0f33e0f 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -521,3 +521,147 @@ private[streaming] class ConstantRateController(id: Int, estimator: RateEstimato override def publish(rate: Long): Unit = () override def getLatestRate(): Long = rate } + +/** + * Temporarily added some samples for the CEP API's here. + * TODO: add test cases in streaming/src/test/scala/CepSuite.scala + */ + +import org.apache.spark.streaming.dstream.WindowState + +object PatternMatchByKey { + + import org.apache.spark.streaming._ + import org.apache.spark.Partitioner + + def main(args: Array[String]) { + val brokers = args(0) + val topics = args(1) + val checkpointDir = args(2) + + + // Create context with 2 second batch interval + val sparkConf = new SparkConf().setMaster("local[*]").setAppName("PatternMatchByKey") + val ssc = new StreamingContext(sparkConf, Seconds(6)) + ssc.checkpoint(checkpointDir) + + // Create direct kafka stream with brokers and topics + val topicsSet = topics.split(",").toSet + val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers, + "auto.offset.reset" -> "smallest") + + case class Tick(symbol: String, price: Int, ts: Long) + + // Read a stream of message from kafka + val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( + ssc, kafkaParams, topicsSet) + + // Get each message and put into a Tick object + val ticks = messages.map(_._2) + .map(_.split("[,:]")).map(p => { + Tick(p(1), p(3).trim.toInt, p(5).trim.toLong) + }) + + // put into a KV by compName and Timetamp + val kvTicks = ticks.map(t => (t.symbol, t)) + kvTicks.cache() + + + // define rise, drop & deep predicates + def rise(in: Tick, ew: WindowState): Boolean = { + in.price > ew.first.asInstanceOf[Tick].price && + in.price >= ew.prev.asInstanceOf[Tick].price + } + def drop(in: Tick, ew: WindowState): Boolean = { + in.price >= ew.first.asInstanceOf[Tick].price && + in.price < ew.prev.asInstanceOf[Tick].price + } + def deep(in: Tick, ew: WindowState): Boolean = { + in.price < ew.first.asInstanceOf[Tick].price && + in.price < ew.prev.asInstanceOf[Tick].price + } + + // map the predicates to their state names + val predicateMapping: Map[String, (Tick, WindowState) => Boolean] = + Map("rise" -> rise, "drop" -> drop, "deep" -> deep) + + val matches = kvTicks.patternMatchByKeyAndWindow("rise drop [rise ]+ deep".r, + predicateMapping, (in: Tick) => in.ts, Seconds(12), Seconds(6)) + + matches.print() + + ssc.start() + ssc.awaitTermination() + + + // Start the computation + + } +} + +object PatternMatch { + + import org.apache.spark.streaming._ + import org.apache.spark.Partitioner + + def main(args: Array[String]) { + val brokers = args(0) + val topics = args(1) + val checkpointDir = args(2) + + + // Create context with 2 second batch interval + val sparkConf = new SparkConf().setMaster("local[*]").setAppName("PatternMatch") + val ssc = new StreamingContext(sparkConf, Seconds(6)) + ssc.checkpoint(checkpointDir) + + // Create direct kafka stream with brokers and topics + val topicsSet = topics.split(",").toSet + val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers, + "auto.offset.reset" -> "smallest") + + case class Tick(symbol: String, price: Int, ts: Long) + + // Read a stream of message from kafka + val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( + ssc, kafkaParams, topicsSet) + + // Get each message and put into a Tick object + val ticks = messages.map(_._2) + .map(_.split("[,:]")).map(p => { + Tick(p(1), p(3).trim.toInt, p(5).trim.toLong) + }) + + // put into a KV by compName and Timetamp + val kvTicks = ticks.map(t => (t.ts, t)) + + // define rise, drop & deep predicates + def rise(in: Tick, ew: WindowState): Boolean = { + in.price > ew.first.asInstanceOf[Tick].price && + in.price >= ew.prev.asInstanceOf[Tick].price + } + def drop(in: Tick, ew: WindowState): Boolean = { + in.price >= ew.first.asInstanceOf[Tick].price && + in.price < ew.prev.asInstanceOf[Tick].price + } + def deep(in: Tick, ew: WindowState): Boolean = { + in.price < ew.first.asInstanceOf[Tick].price && + in.price < ew.prev.asInstanceOf[Tick].price + } + + // map the predicates to their state names + val predicateMapping: Map[String, (Tick, WindowState) => Boolean] = + Map("rise" -> rise, "drop" -> drop, "deep" -> deep) + + val matches = kvTicks.patternMatchByWindow("rise drop [rise ]+ deep".r, + predicateMapping, Seconds(12), Seconds(6)) + + matches.print() + + ssc.start() + ssc.awaitTermination() + + // Start the computation + + } +} From 3138c1882ea4a5ccc1b174f4df78620e000b0ad2 Mon Sep 17 00:00:00 2001 From: sachin aggarwal Date: Wed, 20 Apr 2016 11:11:38 +0530 Subject: [PATCH 13/13] undo Dstream Changes --- .../scala/org/apache/spark/streaming/dstream/DStream.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala index 60a7e20a6a8a..eb7b64eaf497 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala @@ -20,13 +20,12 @@ package org.apache.spark.streaming.dstream import java.io.{IOException, ObjectInputStream, ObjectOutputStream} - import scala.collection.mutable.HashMap import scala.language.implicitConversions import scala.reflect.ClassTag import scala.util.matching.Regex -import org.apache.spark.{HashPartitioner, SparkContext, SparkException} +import org.apache.spark.{SparkContext, SparkException} import org.apache.spark.internal.Logging import org.apache.spark.rdd.{BlockRDD, PairRDDFunctions, RDD, RDDOperationScope} import org.apache.spark.storage.StorageLevel