|
22 | 22 | from pyspark.storagelevel import StorageLevel |
23 | 23 | from pyspark.streaming.util import rddToFileName, RDDFunction, RDDFunction2 |
24 | 24 | from pyspark.rdd import portable_hash |
25 | | -from pyspark.streaming.duration import Duration, Seconds |
26 | 25 | from pyspark.resultiterable import ResultIterable |
27 | 26 |
|
28 | 27 | __all__ = ["DStream"] |
@@ -334,10 +333,10 @@ def slice(self, begin, end): |
334 | 333 | return [RDD(jrdd, self.ctx, self._jrdd_deserializer) for jrdd in jrdds] |
335 | 334 |
|
336 | 335 | def window(self, windowDuration, slideDuration=None): |
337 | | - d = Seconds(windowDuration) |
| 336 | + d = self._ssc._jduration(windowDuration) |
338 | 337 | if slideDuration is None: |
339 | 338 | return DStream(self._jdstream.window(d), self._ssc, self._jrdd_deserializer) |
340 | | - s = Seconds(slideDuration) |
| 339 | + s = self._ssc._jduration(slideDuration) |
341 | 340 | return DStream(self._jdstream.window(d, s), self._ssc, self._jrdd_deserializer) |
342 | 341 |
|
343 | 342 | def reduceByWindow(self, reduceFunc, invReduceFunc, windowDuration, slideDuration): |
@@ -375,16 +374,12 @@ def invReduceFunc(a, b, t): |
375 | 374 | joined = a.leftOuterJoin(b, numPartitions) |
376 | 375 | return joined.mapValues(lambda (v1, v2): invFunc(v1, v2) if v2 is not None else v1) |
377 | 376 |
|
378 | | - if not isinstance(windowDuration, Duration): |
379 | | - windowDuration = Seconds(windowDuration) |
380 | | - if not isinstance(slideDuration, Duration): |
381 | | - slideDuration = Seconds(slideDuration) |
382 | 377 | jreduceFunc = RDDFunction2(self.ctx, reduceFunc, reduced._jrdd_deserializer) |
383 | 378 | jinvReduceFunc = RDDFunction2(self.ctx, invReduceFunc, reduced._jrdd_deserializer) |
384 | 379 | dstream = self.ctx._jvm.PythonReducedWindowedDStream(reduced._jdstream.dstream(), |
385 | 380 | jreduceFunc, jinvReduceFunc, |
386 | | - windowDuration._jduration, |
387 | | - slideDuration._jduration) |
| 381 | + self._ssc._jduration(windowDuration), |
| 382 | + self._ssc._jduration(slideDuration)) |
388 | 383 | return DStream(dstream.asJavaDStream(), self._ssc, self.ctx.serializer) |
389 | 384 |
|
390 | 385 | def updateStateByKey(self, updateFunc, numPartitions=None): |
|
0 commit comments