Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,19 @@ case class EventTimeStats(var max: Long, var min: Long, var avg: Double, var cou
}

def merge(that: EventTimeStats): Unit = {
this.max = math.max(this.max, that.max)
this.min = math.min(this.min, that.min)
this.count += that.count
this.avg += (that.avg - this.avg) * that.count / this.count
if (that.count == 0) {
// no-op
} else if (this.count == 0) {
this.max = that.max
this.min = that.min
this.count = that.count
this.avg = that.avg
} else {
this.max = math.max(this.max, that.max)
this.min = math.min(this.min, that.min)
this.count += that.count
this.avg += (that.avg - this.avg) * that.count / this.count
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
sqlContext.streams.active.foreach(_.stop())
}

test("EventTimeStats") {
val epsilon = 10E-6
private val epsilon = 10E-6

test("EventTimeStats") {
val stats = EventTimeStats(max = 100, min = 10, avg = 20.0, count = 5)
stats.add(80L)
stats.max should be (100)
Expand All @@ -62,7 +62,6 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
}

test("EventTimeStats: avg on large values") {
val epsilon = 10E-6
val largeValue = 10000000000L // 10B
// Make sure `largeValue` will cause overflow if we use a Long sum to calc avg.
assert(largeValue * largeValue != BigInt(largeValue) * BigInt(largeValue))
Expand All @@ -80,6 +79,33 @@ class EventTimeWatermarkSuite extends StreamTest with BeforeAndAfter with Matche
stats.avg should be ((largeValue + 0.5) +- epsilon)
}

test("EventTimeStats: zero merge zero") {
val stats = EventTimeStats.zero
val stats2 = EventTimeStats.zero
stats.merge(stats2)
stats should be (EventTimeStats.zero)
}

test("EventTimeStats: non-zero merge zero") {
val stats = EventTimeStats(max = 10, min = 1, avg = 5.0, count = 3)
val stats2 = EventTimeStats.zero
stats.merge(stats2)
stats.max should be (10L)
stats.min should be (1L)
stats.avg should be (5.0 +- epsilon)
stats.count should be (3L)
}

test("EventTimeStats: zero merge non-zero") {
val stats = EventTimeStats.zero
val stats2 = EventTimeStats(max = 10, min = 1, avg = 5.0, count = 3)
stats.merge(stats2)
stats.max should be (10L)
stats.min should be (1L)
stats.avg should be (5.0 +- epsilon)
stats.count should be (3L)
}

test("error on bad column") {
val inputData = MemoryStream[Int].toDF()
val e = intercept[AnalysisException] {
Expand Down