@@ -25,15 +25,16 @@ import scala.xml.Node
2525import org .apache .spark .Logging
2626import org .apache .spark .ui ._
2727import org .apache .spark .ui .UIUtils ._
28- import org .apache .spark .util .Distribution
28+ import org .apache .spark .streaming .scheduler .ReceiverInfo
29+
2930
3031/** Page for Spark Web UI that shows statistics of a streaming job */
3132private [ui] class StreamingPage (parent : StreamingTab )
3233 extends WebUIPage (" " ) with Logging {
3334
3435 private val listener = parent.listener
3536 private val startTime = Calendar .getInstance().getTime()
36- private val emptyCell = " -"
37+ private val empty = " -"
3738
3839 /** Render the page */
3940 def render (request : HttpServletRequest ): Seq [Node ] = {
@@ -45,6 +46,19 @@ private[ui] class StreamingPage(parent: StreamingTab)
4546 UIUtils .headerSparkPage(" Streaming" , content, parent, Some (5000 ))
4647 }
4748
49+ private val batchStatsTable : UITable [(String , Option [Long ], Option [Seq [Double ]])] = {
50+ val builder = new UITableBuilder [(String , Option [Long ], Option [Seq [Double ]])]()
51+ import builder ._
52+ col(" Metric" ) { _._1 }
53+ optDurationCol(" Last batch" ) { _._2 }
54+ optDurationCol(" Minimum" ) { _._3.map(_(0 ).toLong) }
55+ optDurationCol(" 25th percentile" ) { _._3.map(_(1 ).toLong) }
56+ optDurationCol(" Median" ) { _._3.map(_(2 ).toLong) }
57+ optDurationCol(" 75th percentile" ) { _._3.map(_(3 ).toLong) }
58+ optDurationCol(" Maximum" ) { _._3.map(_(4 ).toLong) }
59+ build
60+ }
61+
4862 /** Generate basic stats of the streaming program */
4963 private def generateBasicStats (): Seq [Node ] = {
5064 val timeSinceStart = System .currentTimeMillis() - startTime.getTime
@@ -75,37 +89,45 @@ private[ui] class StreamingPage(parent: StreamingTab)
7589 val receivedRecordDistributions = listener.receivedRecordsDistributions
7690 val lastBatchReceivedRecord = listener.lastReceivedBatchRecords
7791 val table = if (receivedRecordDistributions.size > 0 ) {
78- val headerRow = Seq (
79- " Receiver" ,
80- " Status" ,
81- " Location" ,
82- " Records in last batch\n [" + formatDate(Calendar .getInstance().getTime()) + " ]" ,
83- " Minimum rate\n [records/sec]" ,
84- " Median rate\n [records/sec]" ,
85- " Maximum rate\n [records/sec]" ,
86- " Last Error"
87- )
88- val dataRows = (0 until listener.numReceivers).map { receiverId =>
89- val receiverInfo = listener.receiverInfo(receiverId)
90- val receiverName = receiverInfo.map(_.name).getOrElse(s " Receiver- $receiverId" )
91- val receiverActive = receiverInfo.map { info =>
92- if (info.active) " ACTIVE" else " INACTIVE"
93- }.getOrElse(emptyCell)
94- val receiverLocation = receiverInfo.map(_.location).getOrElse(emptyCell)
95- val receiverLastBatchRecords = formatNumber(lastBatchReceivedRecord(receiverId))
96- val receivedRecordStats = receivedRecordDistributions(receiverId).map { d =>
97- d.getQuantiles(Seq (0.0 , 0.5 , 1.0 )).map(r => formatNumber(r.toLong))
98- }.getOrElse {
99- Seq (emptyCell, emptyCell, emptyCell, emptyCell, emptyCell)
92+ val tableRenderer : UITable [(Int , Option [ReceiverInfo ])] = {
93+ val builder = new UITableBuilder [(Int , Option [ReceiverInfo ])]()
94+ import builder ._
95+ col(" Receiver" ) { case (receiverId, receiverInfo) =>
96+ receiverInfo.map(_.name).getOrElse(s " Receiver- $receiverId" )
97+ }
98+ col(" Status" ) { case (_, receiverInfo) =>
99+ receiverInfo.map { info => if (info.active) " ACTIVE" else " INACTIVE" }.getOrElse(empty)
100+ }
101+ col(" Location" ) { case (_, receiverInfo) => receiverInfo.map(_.location).getOrElse(empty) }
102+ col(" Records in last batch\n [" + formatDate(Calendar .getInstance().getTime()) + " ]" ) {
103+ case (receiverId, _) => formatNumber(lastBatchReceivedRecord(receiverId))
100104 }
101- val receiverLastError = listener.receiverInfo(receiverId).map { info =>
102- val msg = s " ${info.lastErrorMessage} - ${info.lastError}"
103- if (msg.size > 100 ) msg.take(97 ) + " ..." else msg
104- }.getOrElse(emptyCell)
105- Seq (receiverName, receiverActive, receiverLocation, receiverLastBatchRecords) ++
106- receivedRecordStats ++ Seq (receiverLastError)
105+ col(" Minimum rate\n [records/sec]" ) {
106+ case (receiverId, _) => receivedRecordDistributions(receiverId).map {
107+ _.getQuantiles(Seq (0.0 )).map(formatNumber).head
108+ }.getOrElse(empty)
109+ }
110+ col(" Median rate\n [records/sec]" ) {
111+ case (receiverId, _) => receivedRecordDistributions(receiverId).map {
112+ _.getQuantiles(Seq (0.5 )).map(formatNumber).head
113+ }.getOrElse(empty)
114+ }
115+ col(" Maximum rate\n [records/sec]" ) {
116+ case (receiverId, _) => receivedRecordDistributions(receiverId).map {
117+ _.getQuantiles(Seq (1.0 )).map(formatNumber).head
118+ }.getOrElse(empty)
119+ }
120+ col(" Last Error" ) {
121+ case (_, receiverInfo) => receiverInfo.map { info =>
122+ val msg = s " ${info.lastErrorMessage} - ${info.lastError}"
123+ if (msg.size > 100 ) msg.take(97 ) + " ..." else msg
124+ }.getOrElse(empty)
125+ }
126+ build
107127 }
108- Some (listingTable(headerRow, dataRows))
128+
129+ val dataRows = (0 until listener.numReceivers).map { id => (id, listener.receiverInfo(id)) }
130+ Some (tableRenderer.render(dataRows))
109131 } else {
110132 None
111133 }
@@ -121,33 +143,20 @@ private[ui] class StreamingPage(parent: StreamingTab)
121143 private def generateBatchStatsTable (): Seq [Node ] = {
122144 val numBatches = listener.retainedCompletedBatches.size
123145 val lastCompletedBatch = listener.lastCompletedBatch
146+
124147 val table = if (numBatches > 0 ) {
125- val processingDelayQuantilesRow = {
126- Seq (
127- " Processing Time" ,
128- formatDurationOption(lastCompletedBatch.flatMap(_.processingDelay))
129- ) ++ getQuantiles(listener.processingDelayDistribution)
130- }
131- val schedulingDelayQuantilesRow = {
132- Seq (
133- " Scheduling Delay" ,
134- formatDurationOption(lastCompletedBatch.flatMap(_.schedulingDelay))
135- ) ++ getQuantiles(listener.schedulingDelayDistribution)
136- }
137- val totalDelayQuantilesRow = {
138- Seq (
139- " Total Delay" ,
140- formatDurationOption(lastCompletedBatch.flatMap(_.totalDelay))
141- ) ++ getQuantiles(listener.totalDelayDistribution)
142- }
143- val headerRow = Seq (" Metric" , " Last batch" , " Minimum" , " 25th percentile" ,
144- " Median" , " 75th percentile" , " Maximum" )
145- val dataRows : Seq [Seq [String ]] = Seq (
146- processingDelayQuantilesRow,
147- schedulingDelayQuantilesRow,
148- totalDelayQuantilesRow
148+ val rows : Seq [(String , Option [Long ], Option [Seq [Double ]])] = Seq (
149+ (" Processing Time" ,
150+ lastCompletedBatch.flatMap(_.processingDelay),
151+ listener.processingDelayDistribution.map(_.getQuantiles())),
152+ (" Scheduling Delay" ,
153+ lastCompletedBatch.flatMap(_.schedulingDelay),
154+ listener.schedulingDelayDistribution.map(_.getQuantiles())),
155+ (" Total Delay" ,
156+ lastCompletedBatch.flatMap(_.totalDelay),
157+ listener.totalDelayDistribution.map(_.getQuantiles()))
149158 )
150- Some (listingTable(headerRow, dataRows ))
159+ Some (batchStatsTable.render(rows ))
151160 } else {
152161 None
153162 }
@@ -162,26 +171,4 @@ private[ui] class StreamingPage(parent: StreamingTab)
162171
163172 content
164173 }
165-
166-
167- /**
168- * Returns a human-readable string representing a duration such as "5 second 35 ms"
169- */
170- private def formatDurationOption (msOption : Option [Long ]): String = {
171- msOption.map(formatDurationVerbose).getOrElse(emptyCell)
172- }
173-
174- /** Get quantiles for any time distribution */
175- private def getQuantiles (timeDistributionOption : Option [Distribution ]) = {
176- timeDistributionOption.get.getQuantiles().map { ms => formatDurationVerbose(ms.toLong) }
177- }
178-
179- /** Generate HTML table from string data */
180- private def listingTable (headers : Seq [String ], data : Seq [Seq [String ]]) = {
181- def generateDataRow (data : Seq [String ]): Seq [Node ] = {
182- <tr > {data.map(d => <td >{d}</td >)} </tr >
183- }
184- UIUtils .listingTable(headers, generateDataRow, data, fixedWidth = true )
185- }
186174}
187-
0 commit comments