Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@

package org.apache.spark.sql.execution.streaming

import java.util.UUID
import java.text.SimpleDateFormat
import java.util.{Date, TimeZone, UUID}

import scala.collection.mutable
import scala.collection.JavaConverters._
Expand Down Expand Up @@ -78,6 +79,9 @@ trait ProgressReporter extends Logging {
// The timestamp we report an event that has no input data
private var lastNoDataProgressEventTime = Long.MinValue

private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to set the timezone before using this format. Otherwise, the time is based on your timezone. E.g.,

scala> new java.util.Date()
res7: java.util.Date = Tue Dec 06 10:48:00 PST 2016

scala> val timestampFormat = new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") 
timestampFormat: java.text.SimpleDateFormat = java.text.SimpleDateFormat@5af7aed5

scala> timestampFormat.format(new java.util.Date())
res8: String = 2016-12-06T10:48:09.342Z

scala> val timestampFormat = new java.text.SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") 
timestampFormat: java.text.SimpleDateFormat = java.text.SimpleDateFormat@5af7aed5

scala> timestampFormat.setTimeZone(java.util.TimeZone.getTimeZone("UTC"))

scala> timestampFormat.format(new java.util.Date())
res10: String = 2016-12-06T18:48:20.048Z

timestampFormat.setTimeZone(TimeZone.getTimeZone("UTC"))

@volatile
protected var currentStatus: StreamingQueryStatus = {
new StreamingQueryStatus(
Expand Down Expand Up @@ -156,7 +160,7 @@ trait ProgressReporter extends Logging {
id = id,
runId = runId,
name = name,
timestamp = currentTriggerStartTimestamp,
timestamp = timestampFormat.format(new Date(currentTriggerStartTimestamp)),
batchId = currentBatchId,
durationMs = currentDurationsMs.toMap.mapValues(long2Long).asJava,
currentWatermark = offsetSeqMetadata.batchWatermarkMs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._

import org.apache.spark.annotation.Experimental
import org.apache.spark.sql.catalyst.util.DateTimeUtils

/**
* :: Experimental ::
Expand Down Expand Up @@ -76,7 +77,7 @@ class StreamingQueryProgress private[sql](
val id: UUID,
val runId: UUID,
val name: String,
val timestamp: Long,
val timestamp: String,
val batchId: Long,
val durationMs: ju.Map[String, java.lang.Long],
val currentWatermark: Long,
Expand Down Expand Up @@ -109,7 +110,7 @@ class StreamingQueryProgress private[sql](
("id" -> JString(id.toString)) ~
("runId" -> JString(runId.toString)) ~
("name" -> JString(name)) ~
("timestamp" -> JInt(timestamp)) ~
("timestamp" -> JString(timestamp)) ~
("numInputRows" -> JInt(numInputRows)) ~
("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~
("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) ~
Expand All @@ -121,7 +122,6 @@ class StreamingQueryProgress private[sql](
("stateOperators" -> JArray(stateOperators.map(_.jsonValue).toList)) ~
("sources" -> JArray(sources.map(_.jsonValue).toList)) ~
("sink" -> sink.jsonValue)

}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class StreamingQueryStatusAndProgressSuite extends SparkFunSuite {
| "id" : "${testProgress1.id.toString}",
| "runId" : "${testProgress1.runId.toString}",
| "name" : "myName",
| "timestamp" : 1,
| "timestamp" : "2016-12-05T20:54:20.827Z",
| "numInputRows" : 678,
| "inputRowsPerSecond" : 10.0,
| "durationMs" : {
Expand Down Expand Up @@ -71,7 +71,7 @@ class StreamingQueryStatusAndProgressSuite extends SparkFunSuite {
| "id" : "${testProgress2.id.toString}",
| "runId" : "${testProgress2.runId.toString}",
| "name" : null,
| "timestamp" : 1,
| "timestamp" : "2016-12-05T20:54:20.827Z",
| "numInputRows" : 678,
| "durationMs" : {
| "total" : 0
Expand Down Expand Up @@ -131,7 +131,7 @@ object StreamingQueryStatusAndProgressSuite {
id = UUID.randomUUID,
runId = UUID.randomUUID,
name = "myName",
timestamp = 1L,
timestamp = "2016-12-05T20:54:20.827Z",
batchId = 2L,
durationMs = Map("total" -> 0L).mapValues(long2Long).asJava,
currentWatermark = 3L,
Expand All @@ -153,7 +153,7 @@ object StreamingQueryStatusAndProgressSuite {
id = UUID.randomUUID,
runId = UUID.randomUUID,
name = null, // should not be present in the json
timestamp = 1L,
timestamp = "2016-12-05T20:54:20.827Z",
batchId = 2L,
durationMs = Map("total" -> 0L).mapValues(long2Long).asJava,
currentWatermark = 3L,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging {
assert(progress.id === query.id)
assert(progress.name === query.name)
assert(progress.batchId === 0)
assert(progress.timestamp === 100)
assert(progress.timestamp === "1970-01-01T00:00:00.100Z") // 100 ms in UTC
assert(progress.numInputRows === 2)
assert(progress.processedRowsPerSecond === 2.0)

Expand Down