@@ -110,19 +110,13 @@ abstract class DStream[T: ClassTag] (
110110 val creationSite = Utils .getCallSite
111111
112112 /* Store the RDD creation callSite in threadlocal */
113- private def setRDDCreationCallSite () = {
114- ssc.sparkContext.setLocalProperty(Utils .CALL_SITE_SHORT , creationSite.shortForm)
115- ssc.sparkContext.setLocalProperty(Utils .CALL_SITE_LONG , creationSite.longForm)
116- }
117-
118- /* Store the supplied callSite in threadlocal */
119- private def setRDDCallSite (callSite : CallSite ) = {
113+ private def setRDDCreationCallSite (callSite : CallSite = creationSite) = {
120114 ssc.sparkContext.setLocalProperty(Utils .CALL_SITE_SHORT , callSite.shortForm)
121115 ssc.sparkContext.setLocalProperty(Utils .CALL_SITE_LONG , callSite.longForm)
122116 }
123117
124118 /* Return the current callSite */
125- private [streaming] def getCallSite (): CallSite = {
119+ private [streaming] def getRDDCreationCallSite (): CallSite = {
126120 CallSite (ssc.sparkContext.getLocalProperty(Utils .CALL_SITE_SHORT ),
127121 ssc.sparkContext.getLocalProperty(Utils .CALL_SITE_LONG ))
128122 }
@@ -309,8 +303,8 @@ abstract class DStream[T: ClassTag] (
309303 // (based on sliding time of this DStream), then generate the RDD
310304 case None => {
311305 if (isTimeValid(time)) {
312- val prevCallSite = getCallSite
313- setRDDCreationCallSite
306+ val prevCallSite = getRDDCreationCallSite
307+ setRDDCreationCallSite()
314308 val rddOption = compute(time) match {
315309 case Some (newRDD) =>
316310 if (storageLevel != StorageLevel .NONE ) {
@@ -329,7 +323,7 @@ abstract class DStream[T: ClassTag] (
329323 case None =>
330324 return None
331325 }
332- setRDDCallSite (prevCallSite)
326+ setRDDCreationCallSite (prevCallSite)
333327 return rddOption
334328 } else {
335329 return None
0 commit comments