Skip to content

Commit 095184d

Browse files
committed
Changed to string
1 parent 772ddbe commit 095184d

File tree

3 files changed

+12
-9
lines changed

3 files changed

+12
-9
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@
1717

1818
package org.apache.spark.sql.execution.streaming
1919

20-
import java.util.UUID
20+
import java.text.SimpleDateFormat
21+
import java.util.{Date, UUID}
2122

2223
import scala.collection.mutable
2324
import scala.collection.JavaConverters._
@@ -78,6 +79,8 @@ trait ProgressReporter extends Logging {
7879
// The timestamp we report an event that has no input data
7980
private var lastNoDataProgressEventTime = Long.MinValue
8081

82+
private val timestampFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'Z'") // ISO8601
83+
8184
@volatile
8285
protected var currentStatus: StreamingQueryStatus = {
8386
new StreamingQueryStatus(
@@ -156,7 +159,7 @@ trait ProgressReporter extends Logging {
156159
id = id,
157160
runId = runId,
158161
name = name,
159-
timestamp = currentTriggerStartTimestamp,
162+
timestamp = timestampFormat.format(new Date(currentTriggerStartTimestamp)),
160163
batchId = currentBatchId,
161164
durationMs = currentDurationsMs.toMap.mapValues(long2Long).asJava,
162165
currentWatermark = offsetSeqMetadata.batchWatermarkMs,

sql/core/src/main/scala/org/apache/spark/sql/streaming/progress.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import org.json4s.JsonDSL._
2929
import org.json4s.jackson.JsonMethods._
3030

3131
import org.apache.spark.annotation.Experimental
32+
import org.apache.spark.sql.catalyst.util.DateTimeUtils
3233

3334
/**
3435
* :: Experimental ::
@@ -76,7 +77,7 @@ class StreamingQueryProgress private[sql](
7677
val id: UUID,
7778
val runId: UUID,
7879
val name: String,
79-
val timestamp: Long,
80+
val timestamp: String,
8081
val batchId: Long,
8182
val durationMs: ju.Map[String, java.lang.Long],
8283
val currentWatermark: Long,
@@ -109,7 +110,7 @@ class StreamingQueryProgress private[sql](
109110
("id" -> JString(id.toString)) ~
110111
("runId" -> JString(runId.toString)) ~
111112
("name" -> JString(name)) ~
112-
("timestamp" -> JInt(timestamp)) ~
113+
("timestamp" -> JString(timestamp)) ~
113114
("numInputRows" -> JInt(numInputRows)) ~
114115
("inputRowsPerSecond" -> safeDoubleToJValue(inputRowsPerSecond)) ~
115116
("processedRowsPerSecond" -> safeDoubleToJValue(processedRowsPerSecond)) ~
@@ -121,7 +122,6 @@ class StreamingQueryProgress private[sql](
121122
("stateOperators" -> JArray(stateOperators.map(_.jsonValue).toList)) ~
122123
("sources" -> JArray(sources.map(_.jsonValue).toList)) ~
123124
("sink" -> sink.jsonValue)
124-
125125
}
126126
}
127127

sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ class StreamingQueryStatusAndProgressSuite extends SparkFunSuite {
3838
| "id" : "${testProgress1.id.toString}",
3939
| "runId" : "${testProgress1.runId.toString}",
4040
| "name" : "myName",
41-
| "timestamp" : 1,
41+
| "timestamp" : "2016-12-05T20:54:20.827Z",
4242
| "numInputRows" : 678,
4343
| "inputRowsPerSecond" : 10.0,
4444
| "durationMs" : {
@@ -71,7 +71,7 @@ class StreamingQueryStatusAndProgressSuite extends SparkFunSuite {
7171
| "id" : "${testProgress2.id.toString}",
7272
| "runId" : "${testProgress2.runId.toString}",
7373
| "name" : null,
74-
| "timestamp" : 1,
74+
| "timestamp" : "2016-12-05T20:54:20.827Z",
7575
| "numInputRows" : 678,
7676
| "durationMs" : {
7777
| "total" : 0
@@ -131,7 +131,7 @@ object StreamingQueryStatusAndProgressSuite {
131131
id = UUID.randomUUID,
132132
runId = UUID.randomUUID,
133133
name = "myName",
134-
timestamp = 1L,
134+
timestamp = "2016-12-05T20:54:20.827Z",
135135
batchId = 2L,
136136
durationMs = Map("total" -> 0L).mapValues(long2Long).asJava,
137137
currentWatermark = 3L,
@@ -153,7 +153,7 @@ object StreamingQueryStatusAndProgressSuite {
153153
id = UUID.randomUUID,
154154
runId = UUID.randomUUID,
155155
name = null, // should not be present in the json
156-
timestamp = 1L,
156+
timestamp = "2016-12-05T20:54:20.827Z",
157157
batchId = 2L,
158158
durationMs = Map("total" -> 0L).mapValues(long2Long).asJava,
159159
currentWatermark = 3L,

0 commit comments

Comments
 (0)