diff --git a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala index f14ff6705fd9..cfe3c0f33e0f 100644 --- a/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala +++ b/external/kafka/src/test/scala/org/apache/spark/streaming/kafka/DirectKafkaStreamSuite.scala @@ -521,3 +521,147 @@ private[streaming] class ConstantRateController(id: Int, estimator: RateEstimato override def publish(rate: Long): Unit = () override def getLatestRate(): Long = rate } + +/** + * Temporarily added some samples for the CEP API's here. + * TODO: add test cases in streaming/src/test/scala/CepSuite.scala + */ + +import org.apache.spark.streaming.dstream.WindowState + +object PatternMatchByKey { + + import org.apache.spark.streaming._ + import org.apache.spark.Partitioner + + def main(args: Array[String]) { + val brokers = args(0) + val topics = args(1) + val checkpointDir = args(2) + + + // Create context with 2 second batch interval + val sparkConf = new SparkConf().setMaster("local[*]").setAppName("PatternMatchByKey") + val ssc = new StreamingContext(sparkConf, Seconds(6)) + ssc.checkpoint(checkpointDir) + + // Create direct kafka stream with brokers and topics + val topicsSet = topics.split(",").toSet + val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers, + "auto.offset.reset" -> "smallest") + + case class Tick(symbol: String, price: Int, ts: Long) + + // Read a stream of message from kafka + val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( + ssc, kafkaParams, topicsSet) + + // Get each message and put into a Tick object + val ticks = messages.map(_._2) + .map(_.split("[,:]")).map(p => { + Tick(p(1), p(3).trim.toInt, p(5).trim.toLong) + }) + + // put into a KV by compName and Timetamp + val kvTicks = ticks.map(t => (t.symbol, t)) + kvTicks.cache() + + + // define rise, drop & deep predicates + def rise(in: Tick, ew: WindowState): Boolean = { + in.price > ew.first.asInstanceOf[Tick].price && + in.price >= ew.prev.asInstanceOf[Tick].price + } + def drop(in: Tick, ew: WindowState): Boolean = { + in.price >= ew.first.asInstanceOf[Tick].price && + in.price < ew.prev.asInstanceOf[Tick].price + } + def deep(in: Tick, ew: WindowState): Boolean = { + in.price < ew.first.asInstanceOf[Tick].price && + in.price < ew.prev.asInstanceOf[Tick].price + } + + // map the predicates to their state names + val predicateMapping: Map[String, (Tick, WindowState) => Boolean] = + Map("rise" -> rise, "drop" -> drop, "deep" -> deep) + + val matches = kvTicks.patternMatchByKeyAndWindow("rise drop [rise ]+ deep".r, + predicateMapping, (in: Tick) => in.ts, Seconds(12), Seconds(6)) + + matches.print() + + ssc.start() + ssc.awaitTermination() + + + // Start the computation + + } +} + +object PatternMatch { + + import org.apache.spark.streaming._ + import org.apache.spark.Partitioner + + def main(args: Array[String]) { + val brokers = args(0) + val topics = args(1) + val checkpointDir = args(2) + + + // Create context with 2 second batch interval + val sparkConf = new SparkConf().setMaster("local[*]").setAppName("PatternMatch") + val ssc = new StreamingContext(sparkConf, Seconds(6)) + ssc.checkpoint(checkpointDir) + + // Create direct kafka stream with brokers and topics + val topicsSet = topics.split(",").toSet + val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers, + "auto.offset.reset" -> "smallest") + + case class Tick(symbol: String, price: Int, ts: Long) + + // Read a stream of message from kafka + val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder]( + ssc, kafkaParams, topicsSet) + + // Get each message and put into a Tick object + val ticks = messages.map(_._2) + .map(_.split("[,:]")).map(p => { + Tick(p(1), p(3).trim.toInt, p(5).trim.toLong) + }) + + // put into a KV by compName and Timetamp + val kvTicks = ticks.map(t => (t.ts, t)) + + // define rise, drop & deep predicates + def rise(in: Tick, ew: WindowState): Boolean = { + in.price > ew.first.asInstanceOf[Tick].price && + in.price >= ew.prev.asInstanceOf[Tick].price + } + def drop(in: Tick, ew: WindowState): Boolean = { + in.price >= ew.first.asInstanceOf[Tick].price && + in.price < ew.prev.asInstanceOf[Tick].price + } + def deep(in: Tick, ew: WindowState): Boolean = { + in.price < ew.first.asInstanceOf[Tick].price && + in.price < ew.prev.asInstanceOf[Tick].price + } + + // map the predicates to their state names + val predicateMapping: Map[String, (Tick, WindowState) => Boolean] = + Map("rise" -> rise, "drop" -> drop, "deep" -> deep) + + val matches = kvTicks.patternMatchByWindow("rise drop [rise ]+ deep".r, + predicateMapping, Seconds(12), Seconds(6)) + + matches.print() + + ssc.start() + ssc.awaitTermination() + + // Start the computation + + } +} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/KeyedPatternMatchDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/KeyedPatternMatchDStream.scala new file mode 100644 index 000000000000..23ba383858e5 --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/KeyedPatternMatchDStream.scala @@ -0,0 +1,214 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.dstream + +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.rdd.{CoGroupedRDD, PartitionerAwareUnionRDD, RDD, UnionRDD} +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{Duration, Interval, Time} +import org.apache.spark.{Accumulator, Partitioner, SparkContext} + + + +import scala.collection.mutable.ArrayBuffer +import scala.reflect.ClassTag + + + +private[streaming] +class KeyedPatternMatchDStream[K: ClassTag, V: ClassTag]( + parent: DStream[(K, V)], + pattern: scala.util.matching.Regex, + predicates: Map[String, (V, WindowState) => Boolean], + valuesOrderedBy: V => Long, + _windowDuration: Duration, + _slideDuration: Duration, + partitioner: Partitioner + ) extends DStream[(K, List[V])](parent.ssc) { + + require(_windowDuration.isMultipleOf(parent.slideDuration), + "The window duration of PatternMatchedDStream (" + _windowDuration + ") " + + "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")" + ) + + require(_slideDuration.isMultipleOf(parent.slideDuration), + "The slide duration of PatternMatchedDStream (" + _slideDuration + ") " + + "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")" + ) + + + private def aggregateByKey(in: DStream[(K, V)]) : DStream[(K, KeyedWithPrev[V])] = { + val aggregated = in.transform(rdd => { + val zero = new KeyedAggregator[V]() + rdd.aggregateByKey(zero)( + (set, v) => set.append(v), + (set1, set2) => set1 ++= set2) + }) + aggregated.transform(rdd => { + val valuesOrderedByCopy = valuesOrderedBy + rdd.mapValues(x => { + x.sortedWithPrev(valuesOrderedByCopy) + }) + }) + } + + val keyedWithPrevStream = aggregateByKey(parent) + + super.persist(StorageLevel.MEMORY_ONLY_SER) + keyedWithPrevStream.persist(StorageLevel.MEMORY_ONLY_SER) + + def windowDuration: Duration = _windowDuration + + override def dependencies: List[DStream[_]] = List(keyedWithPrevStream) + + override def slideDuration: Duration = _slideDuration + + override val mustCheckpoint = true + + override def persist(storageLevel: StorageLevel): DStream[(K, List[V])] = { + super.persist(storageLevel) + keyedWithPrevStream.persist(storageLevel) + this + } + + override def checkpoint(interval: Duration): DStream[(K, List[V])] = { + super.checkpoint(interval) + this + } + + override def parentRememberDuration: Duration = rememberDuration + windowDuration + + override def compute(validTime: Time): Option[RDD[(K, List[V])]] = { + + val predicatesCopy = predicates + val patternCopy = pattern; + val currentWindow = new Interval(validTime - windowDuration + parent.slideDuration, validTime) + val previousWindow = currentWindow - slideDuration + + val newRDDs = keyedWithPrevStream.slice(previousWindow.endTime + parent.slideDuration, + currentWindow.endTime) + + val lastValidRDDs = + keyedWithPrevStream.slice(currentWindow.beginTime, previousWindow.endTime) + + + // Make the list of RDDs that needs to cogrouped together for pattern matching + val allRDDs = new ArrayBuffer[RDD[(K, KeyedWithPrev[V])]]() ++= lastValidRDDs ++= newRDDs + + + // Cogroup the reduced RDDs and merge the reduced values + val cogroupedRDD = new CoGroupedRDD[K](allRDDs.toSeq.asInstanceOf[Seq[RDD[(K, _)]]], + partitioner) + + val totalValues = lastValidRDDs.size + newRDDs.size + + val mergeValues = (arrayOfValues: + Array[org.apache.spark.util.collection.CompactBuffer[KeyedWithPrev[V]]]) => { + + val compactBuffervalues = + (0 to totalValues-1).map(i => arrayOfValues(i)).filter(!_.isEmpty).map(_.head) + val values = compactBuffervalues.flatMap(y => y.list) + + import scala.collection.mutable.ListBuffer + if (!values.isEmpty) { + val minVal = values(0) + val pattern = values.map(x => { + var isMatch = false + var matchName = "NAN" + for (predicate <- predicatesCopy if !isMatch) { + isMatch = predicate._2(x._1, WindowState(minVal._1, x._2.getOrElse(minVal._1))) + if (isMatch) { + matchName = predicate._1 + } + } + (matchName, x._1) + }) + + val builder = new scala.collection.mutable.StringBuilder() + val map = scala.collection.mutable.HashMap[Long, V]() + var curLen = 1L + for (i <- pattern.iterator) { + builder.append(" " + i._1) + map.put(curLen, i._2) + curLen += i._1.length + 1 + } + + val matchIt = patternCopy.findAllIn(builder.toString()) + val o = ListBuffer[List[V]]() + for (one <- matchIt) { + var len = 0 + var ctr = 0 + val list = ListBuffer[V]() + one.split(" ").map(sub => { + val id = map.get(matchIt.start + len) + list += id.get + len += sub.length + 1 + }) + o += list.toList + } + o.toList.asInstanceOf[List[V]] + } + else { + ListBuffer[List[V]]().toList.asInstanceOf[List[V]] + } + } + + val mergedValuesRDD = cogroupedRDD.asInstanceOf[RDD[(K, + Array[org.apache.spark.util.collection.CompactBuffer[KeyedWithPrev[V]]])]] + .mapValues(mergeValues) + + Some(mergedValuesRDD.filter{case(k, v) => !v.isEmpty }) + } +} + +import scala.collection.mutable.ListBuffer + +private[streaming] +case class KeyedWithPrev[V: ClassTag](val list: List[(V, Option[V])]) + +private[streaming] +class KeyedAggregator[V: ClassTag](val list: ListBuffer[V]) extends Serializable { + + def this() = { this(ListBuffer[V]() ) } + + def append(in: V): KeyedAggregator[V] = { + list += in + this + } + + def ++=(other: KeyedAggregator[V]): KeyedAggregator[V] = { + new KeyedAggregator(list ++= other.list) + } + + def sortedWithPrev(f: V => Long) : KeyedWithPrev[V] = { + val sortedList = list.sortBy(e => f(e)) + val it = sortedList.iterator + + var start = true + val n = sortedList.map(x => { + (x, if (start) { + start = false + None: Option[V] + } else { + Some(it.next()) + }) + }) + KeyedWithPrev[V](n.toList) + } +} + diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala index d6ff96e1fc69..e7d9c7a4f94a 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PairDStreamFunctions.scala @@ -350,6 +350,103 @@ class PairDStreamFunctions[K, V](self: DStream[(K, V)]) ) } + /** + * Return a new DStream that contains the list of individual events that matched the specified + * pattern over a sliding window. Empty DStream is returned if no matches found. The pattern + * search is for events partitioned by the Key K. + * 1. The pattern is a regular expression over the predicate function names + * (keys in the predicates map). + * 2. Each predicate function (value in the predicates map) receives the current event along + * with a WindowState variable that contains the first and immediately prior event in the + * window. It returns a boolean to indicate if the current event matched the predicate condition. + * 3. For each event, the predicates map is looped through and each predicate function invoked + * till first one returns true + * + * @param pattern regular expression that represents the composite event being searched for + * @param predicates map of individual predicate names to predicates functions. The pattern + * parameter is a regular expression over these names. + * @param valuesOrderedBy function that returns the attribute which orders the events within a + * key + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + */ + def patternMatchByKeyAndWindow( + pattern: scala.util.matching.Regex, + predicates: Map[String, (V, WindowState) => Boolean], + valuesOrderedBy: V => Long, // TODO return type to be made a generic type + windowDuration: Duration, + slideDuration: Duration, + numPartitions: Int = ssc.sc.defaultParallelism + ): DStream[(K, List[V])] = ssc.withScope { + patternMatchByKeyAndWindow(pattern, predicates, valuesOrderedBy, + windowDuration, slideDuration, defaultPartitioner(numPartitions) + ) + } + + def patternMatchByKeyAndWindow( + pattern: scala.util.matching.Regex, + predicates: Map[String, (V, WindowState) => Boolean], + valuesOrderedBy: V => Long, // TODO return type to be made a generic type + windowDuration: Duration, + slideDuration: Duration, + partitioner: Partitioner + ): DStream[(K, List[V])] = ssc.withScope { + new KeyedPatternMatchDStream[K, V]( + self, pattern, predicates, valuesOrderedBy, + windowDuration, slideDuration, partitioner + ) + } + + /** + * Return a new DStream that contains the list of individual events that matched the specified + * pattern over a sliding window. Empty DStream is returned if no matches found. The pattern + * search is ordered over the Key K. + * 1. The pattern is a regular expression over the predicate function names + * (keys in the predicates map). + * 2. Each predicate function (value in the predicates map) receives the current event + * along with a WindowState variable that contains the first and immediately prior event + * in the window. It returns a boolean to indicate if the current event matched the predicate + * condition. + * 3. For each event, the predicates map is looped through and each predicate function invoked + * till first one returns true + * + * @param pattern regular expression that represents the composite event being searched for + * @param predicates map of individual predicate names to predicates functions. The pattern + * parameter is a regular expression over these names. + * @param windowDuration width of the window; must be a multiple of this DStream's + * batching interval + * @param slideDuration sliding interval of the window (i.e., the interval after which + * the new DStream will generate RDDs); must be a multiple of this + * DStream's batching interval + + */ + def patternMatchByWindow( + pattern: scala.util.matching.Regex, + predicates: Map[String, (V, WindowState) => Boolean], + windowDuration: Duration, + slideDuration: Duration, + numPartitions: Int = ssc.sc.defaultParallelism + ): DStream[List[V]] = ssc.withScope { + patternMatchByWindow(pattern, predicates, + windowDuration, slideDuration, defaultPartitioner(numPartitions) + ) + } + + def patternMatchByWindow( + pattern: scala.util.matching.Regex, + predicates: Map[String, (V, WindowState) => Boolean], + windowDuration: Duration, + slideDuration: Duration, + partitioner: Partitioner + ): DStream[List[V]] = ssc.withScope { + new PatternMatchedDStream[K, V]( + self, pattern, predicates, + windowDuration, slideDuration, partitioner + ) + } /** * :: Experimental :: * Return a [[MapWithStateDStream]] by applying a function to every key-value element of diff --git a/streaming/src/main/scala/org/apache/spark/streaming/dstream/PatternMatchedDStream.scala b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PatternMatchedDStream.scala new file mode 100644 index 000000000000..b16abed7718e --- /dev/null +++ b/streaming/src/main/scala/org/apache/spark/streaming/dstream/PatternMatchedDStream.scala @@ -0,0 +1,303 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.streaming.dstream + +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.rdd.{PartitionerAwareUnionRDD, RDD, UnionRDD} +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{StreamingContext, Duration, Interval, Time} +import org.apache.spark.{HashPartitioner, Accumulator, Partitioner, SparkContext} + +import scala.reflect.ClassTag + +/** + * Created by agsachin on 26/02/16. + */ + +object PatternMatchedDStream { + private var tracker: Accumulator[Long] = null + + def getInstance(sc: SparkContext): Accumulator[Long] = { + if (tracker == null) { + synchronized { + if (tracker == null) { + tracker = sc.accumulator(0L, "PatternMatchedDStream") + } + } + } + tracker + } +} + +case class WindowState(first: Any, prev: Any) {} + +private[streaming] +class PatternMatchedDStream[K: ClassTag, V: ClassTag]( + parent: DStream[(K, V)], + pattern: scala.util.matching.Regex, + predicates: Map[String, (V, WindowState) => Boolean], + _windowDuration: Duration, + _slideDuration: Duration, + partitioner: Partitioner) + (implicit ord: Ordering[K] + ) extends DStream[List[V]](parent.ssc) { + + require(_windowDuration.isMultipleOf(parent.slideDuration), + "The window duration of PatternMatchedDStream (" + _windowDuration + ") " + + "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")" + ) + + require(_slideDuration.isMultipleOf(parent.slideDuration), + "The slide duration of PatternMatchedDStream (" + _slideDuration + ") " + + "must be multiple of the slide duration of parent DStream (" + parent.slideDuration + ")" + ) + + var brdcst = ssc.sparkContext.broadcast((0L, None: Option[V])) + + private def keyWithPreviousEvent(in: DStream[(K, V)]) : DStream[(Long, (V, Option[V]))] = { + val sortedStream = in.transform(rdd => rdd.sortByKey(true, rdd.partitions.size)) + .map{case(k, v) => v} + val keyedRdd = sortedStream.transform((rdd, time) => { + val brdcstCopy = brdcst + rdd.zipWithIndex().map(i => { + (i._2 + brdcstCopy.value._1 + 1, i._1)}) + }) + + val previousEventRdd = keyedRdd.map(x => { (x._1 + 1, x._2)}) + keyedRdd.leftOuterJoin(previousEventRdd, partitioner).transform(rdd => rdd.sortByKey()) + .transform(rdd => { + val brdcstCopy = brdcst + rdd.map(x => { + if (x._2._2 == None) { + (x._1, (x._2._1, brdcstCopy.value._2)) + } + else { x } + }) + }) + } + val sortedprevMappedStream = keyWithPreviousEvent(parent) + + private def getMaxIdAndElemStream(in: DStream[(Long, (V, Option[V]))]) : DStream[(Long, V)] = { + in.reduce((e1, e2) => { + if (e1._1 > e2._1) { + e1 + } else { + e2 + } + }).map(max => (max._1, max._2._1)) + } + val maxIdAndElemStream = getMaxIdAndElemStream(sortedprevMappedStream) + + super.persist(StorageLevel.MEMORY_ONLY_SER) + sortedprevMappedStream.persist(StorageLevel.MEMORY_ONLY_SER) + maxIdAndElemStream.persist(StorageLevel.MEMORY_ONLY_SER) + + def windowDuration: Duration = _windowDuration + + override def dependencies: List[DStream[_]] = List(sortedprevMappedStream, maxIdAndElemStream) + + override def slideDuration: Duration = _slideDuration + + override val mustCheckpoint = true + + override def persist(storageLevel: StorageLevel): DStream[List[V]] = { + super.persist(storageLevel) + sortedprevMappedStream.persist(storageLevel) + maxIdAndElemStream.persist(storageLevel) + this + } + + override def checkpoint(interval: Duration): DStream[List[V]] = { + super.checkpoint(interval) + this + } + + override def parentRememberDuration: Duration = rememberDuration + windowDuration + + import scala.collection.mutable.ListBuffer + + /** + * This algo applies the regex within each partition rather than globally. + * Benefits may include better latency when NO matches across partition boundaries. + * However to account for the latter, need a separate algo. + */ + val applyRegexPerPartition = (rdd: RDD[(Long, (String, V))]) => { + val patternCopy = pattern + val temp = rdd.mapPartitionsWithIndex{ (index, itr) => itr.toList.map( + x => (index, x)).iterator } + val zero = new Aggregator[V]() + val t = temp.aggregateByKey(zero)( + (set, v) => set.append(v._2._1, v._2._2), + (set1, set2) => set1 ++= set2) + + t.flatMap(x => { + val it = patternCopy.findAllIn(x._2.builder.toString()) + val o = ListBuffer[List[V]]() + for (one <- it) { + var len = 0 + var ctr = 0 + val list = ListBuffer[V]() + one.split(" ").map(sub => { + val id = x._2.map.get(it.start + len) + list += id.get + // Todo not implemented window break when match found. + len += sub.length + 1 + }) + o += list.toList + } + o.toList + }) + } + + /** + * This algo applies the regex globally by simply moving ALL + * data to a single partition. + */ + val applyRegex = (rdd: RDD[(Long, (String, V))]) => { + val patternCopy = pattern + val stream = rdd.groupBy(_ => "all") + .map(_._2) + .map(x => { + val it = x.iterator + val builder = new scala.collection.mutable.StringBuilder() + val map = scala.collection.mutable.HashMap[Long, Tuple2[Long, V]] () + var curLen = 1L + while (it.hasNext) { + val i = it.next() + builder.append(" " + i._2._1) + map.put(curLen, (i._1, i._2._2)) + curLen += i._2._1.length + 1 + } + (builder.toString(), map) + }) + val tracker = PatternMatchedDStream.getInstance(this.ssc.sparkContext) + + stream.flatMap(y => { + val it = patternCopy.findAllIn(y._1) + val o = ListBuffer[List[V]] () + for (one <- it) { + var len = 0 + var ctr = 0 + val list = ListBuffer[V] () + one.split(" ").map(sub => { + val id = y._2.get(it.start + len) + list += id.get._2 + // publish last seen match, so that dont repeat in a sliding window + if (tracker.toString.toLong < id.get._1) { + tracker.setValue(id.get._1) + } + len += sub.length + 1 + }) + o += list.toList + } + o.toList + }) + } + + private def getLastMaxIdAndElem(rdds: Seq[RDD[(Long, V)]]): (Long, Option[V]) = { + if (rdds.length > 0) { + (0 to rdds.size-1).map(i => { + if (!rdds(rdds.size-1-i).isEmpty()) { + val r = rdds(rdds.size-1-i).reduce((e1, e2) => { if (e1._1 > e2._1) e1 else e2 }) + return (r._1, Some(r._2)) + } + }) + } + (0L, None: Option[V]) + } + + override def compute(validTime: Time): Option[RDD[List[V]]] = { + val predicatesCopy = predicates + val patternCopy = pattern; + val currentWindow = new Interval(validTime - windowDuration + parent.slideDuration, validTime) + val previousWindow = currentWindow - slideDuration + + + // first get the row with max key in the last window and broadcast it + // IMP: Dont touch current window till after this broadcast + val oldMaxStreamRDDs = maxIdAndElemStream.slice(previousWindow) + brdcst = ssc.sparkContext.broadcast(getLastMaxIdAndElem(oldMaxStreamRDDs)) + + val rddsInWindow = sortedprevMappedStream.slice(currentWindow) + + val windowRDD = if (rddsInWindow.flatMap(_.partitioner).distinct.length == 1) { + logDebug("Using partition aware union for windowing at " + validTime) + new PartitionerAwareUnionRDD(ssc.sc, rddsInWindow) + } else { + logDebug("Using normal union for windowing at " + validTime) + new UnionRDD(ssc.sc, rddsInWindow) + } + + if (!windowRDD.isEmpty()) { + // we dont want to report old matched patterns in current window if any + // so skip to the ID maintained in tracker + val tracker = PatternMatchedDStream.getInstance(this.ssc.sparkContext) + val shift = if (tracker.toString.toLong > 0L) { + val copy = tracker.toString.toLong + tracker.setValue(0L) + copy + } else { + 0L + } + val newRDD = windowRDD.filter(x => x._1 > shift) + + // loop through and get the predicate string for each event. + val first = newRDD.first() + val taggedRDD = newRDD.map(x => { + var isMatch = false + var matchName = "NAN" + for (predicate <- predicatesCopy if !isMatch) { + isMatch = predicate._2(x._2._1, WindowState(first._2._1, x._2._2.getOrElse(first._2._1))) + if (isMatch) { + matchName = predicate._1 + } + } + (x._1, (matchName, x._2._1)) + }) + Some(applyRegex(taggedRDD)) + } + else { + None + } + } +} + +private[streaming] +class Aggregator[T: ClassTag]( + val builder: scala.collection.mutable.StringBuilder, + val map: scala.collection.mutable.HashMap[Long, T] + ) extends Serializable { + + var curLen = 1L + + def this() = { this( new scala.collection.mutable.StringBuilder(), + scala.collection.mutable.HashMap[Long, T]() ) } + + def append(in: String, elem: T ): Aggregator[T] = { + builder.append(" " + in) + map.put(curLen, elem) + curLen += in.length + 1 + this + } + + def ++=(other: Aggregator[T]): Aggregator[T] = { + builder.append(other.builder.toString()) + map ++= other.map + new Aggregator(builder, map) + } +} \ No newline at end of file