@@ -5,42 +5,34 @@ import cats.implicits._
55import scala .concurrent .duration .{Duration , FiniteDuration }
66
77/** Track statistics over time a fixed size timewindow. */
8- case class TimeSlotStats [K , V : Monoid ] private (
8+ class TimeSlotStats [K , V : Monoid ] private (
99 // Time resolution.
10- slotDuration : FiniteDuration ,
11- slotCount : Int ,
12- // The last written slot.
13- lastIdx : Int ,
14- // Ring buffer of the timestamp of each slot.
15- timeSlots : IndexedSeq [TimeSlotStats .Timestamp ],
16- // Ring buffer of statistics per slot.
17- statSlots : IndexedSeq [Map [K , V ]]
10+ val slotDuration : FiniteDuration ,
11+ // The last written position in the buffer.
12+ val lastIdx : Int ,
13+ // Ring buffer of slots statistics.
14+ val buffer : IndexedSeq [TimeSlotStats .Entry [K , V ]]
1815) {
1916 import TimeSlotStats ._
2017
2118 /** Overall length of the timewindow. */
2219 def duration = slotDuration * slotCount
20+ def slotCount = buffer.size
2321
2422 /** Merge new stats for a given key in the current timestamp. */
2523 def add (key : K , stat : V , timestamp : Timestamp = System .currentTimeMillis): TimeSlotStats [K , V ] = {
26- val currSlot = slotId(timestamp)
27- val lastSlot = timeSlots (lastIdx)
24+ val currSlotId = slotId(timestamp)
25+ val lastEntry = buffer (lastIdx)
2826
29- if (currSlot == lastSlot ) {
27+ if (currSlotId == lastEntry.slotId ) {
3028 // We're still in the same timeslot, so just append stats.
31- val newStats = statSlots(lastIdx) |+| Map (key -> stat)
32- copy(
33- statSlots = statSlots.updated(lastIdx, newStats)
34- )
35- } else if (currSlot > lastSlot) {
29+ val newEntry = lastEntry.add(key, stat)
30+ updated(lastIdx, newEntry)
31+ } else if (currSlotId > lastEntry.slotId) {
3632 // Go to the next slot.
37- val newStats = Map (key -> stat)
3833 val newIdx = succ(lastIdx)
39- copy(
40- lastIdx = newIdx,
41- timeSlots = timeSlots.updated(newIdx, currSlot),
42- statSlots = statSlots.updated(newIdx, newStats)
43- )
34+ val newEntry = Entry (currSlotId, Map (key -> stat))
35+ updated(newIdx, newEntry)
4436 } else {
4537 // Going backwards in time, just ignore it.
4638 this
@@ -49,7 +41,7 @@ case class TimeSlotStats[K, V: Monoid] private (
4941
5042 /** Forget all statistics about a given key. */
5143 def remove (key : K ): TimeSlotStats [K , V ] =
52- copy(statSlots = statSlots .map(_ - key))
44+ updated(lastIdx, buffer .map(_.remove( key) ))
5345
5446 /** Aggregate stats for a key in all slots that are within the duration. */
5547 def get (key : K , timestamp : Timestamp = System .currentTimeMillis): V =
@@ -67,11 +59,11 @@ case class TimeSlotStats[K, V: Monoid] private (
6759 val (start, end) = slotRange(timestamp)
6860
6961 def loop (idx : Int , acc : A ): A = {
70- val t = timeSlots (idx)
71- if (t < start || t > end)
62+ val entry = buffer (idx)
63+ if (entry.slotId < start || entry.slotId > end)
7264 acc
7365 else
74- loop(pred(idx), f(acc, statSlots(idx) ))
66+ loop(pred(idx), f(acc, entry.slotStats ))
7567 }
7668
7769 loop(lastIdx, init)
@@ -82,44 +74,56 @@ case class TimeSlotStats[K, V: Monoid] private (
8274 timestamp - timestamp % slotDuration.toMillis
8375 }
8476
77+ /** The range of time slots based on the current timestamp and the buffer duration. */
8578 private def slotRange (timestamp : Timestamp ): (Timestamp , Timestamp ) = {
86- val endSlot = slotId(timestamp)
87- val startSlot = slotId(timestamp - slotDuration .toMillis * slotCount )
88- startSlot -> endSlot
79+ val end = slotId(timestamp)
80+ val start = slotId(timestamp - duration .toMillis)
81+ start -> end
8982 }
9083
9184 private def succ (idx : Int ): Int = (idx + 1 ) % slotCount
9285 private def pred (idx : Int ): Int = (idx - 1 ) % slotCount
9386
94- private def copy (statSlots : IndexedSeq [Map [K , V ]]): TimeSlotStats [K , V ] =
95- copy(lastIdx, timeSlots, statSlots)
87+ private def updated (
88+ lastIdx : Int ,
89+ entry : Entry [K , V ]
90+ ): TimeSlotStats [K , V ] =
91+ updated(lastIdx, buffer.updated(lastIdx, entry))
9692
97- private def copy (
93+ private def updated (
9894 lastIdx : Int ,
99- timeSlots : IndexedSeq [Timestamp ],
100- statSlots : IndexedSeq [Map [K , V ]]
95+ buffer : IndexedSeq [Entry [K , V ]]
10196 ): TimeSlotStats [K , V ] =
102- new TimeSlotStats [K , V ](slotDuration, slotCount, lastIdx, timeSlots, statSlots )
97+ new TimeSlotStats [K , V ](slotDuration, lastIdx, buffer )
10398}
10499
105100object TimeSlotStats {
106101
107102 // Milliseconds since epoch.
108103 type Timestamp = Long
109104
105+ case class Entry [K , V : Monoid ](
106+ slotId : Timestamp ,
107+ slotStats : Map [K , V ]
108+ ) {
109+ def add (key : K , stat : V ): Entry [K , V ] =
110+ copy(slotStats = slotStats |+| Map (key -> stat))
111+
112+ def remove (key : K ): Entry [K , V ] =
113+ copy(slotStats = slotStats - key)
114+ }
115+
110116 def apply [K , V : Monoid ](
111117 slotDuration : FiniteDuration ,
112118 slotCount : Int
113119 ): Option [TimeSlotStats [K , V ]] =
114120 if (slotDuration == Duration .Zero || slotCount <= 0 ) None
115121 else
116122 Some {
117- TimeSlotStats [K , V ](
123+ new TimeSlotStats [K , V ](
118124 slotDuration,
119- slotCount,
120125 lastIdx = slotCount - 1 , // So the first slot we fill will move to 0.
121- timeSlots = IndexedSeq .fill(slotCount)(- 1L ),
122- statSlots = IndexedSeq .fill(slotCount)(Map .empty[K , V ])
126+ buffer = IndexedSeq .fill(slotCount)(Entry (- 1L , Map .empty[K , V ]))
123127 )
124128 }
125129}
0 commit comments