@@ -620,37 +620,36 @@ abstract class DStream[T: ClassTag] (
620620 new ForEachDStream (this , context.sparkContext.clean(foreachFunc)).register()
621621 }
622622
623- // TODO move pyprint to PythonDStream
623+ // TODO move pyprint to PythonDStream and executed by py4j call back function
624624 /**
625625 * Print the first ten elements of each PythonRDD generated in this PythonDStream. This is an output
626626 * operator, so this PythonDStream will be registered as an output stream and there materialized.
627627 * Since serialized Python object is readable by Python, pyprint writes out binary data to
628628 * temporary file and run python script to deserialized and print the first ten elements
629+ *
630+ * Currently call python script directly. We should avoid this
629631 */
630632 private [streaming] def pyprint () {
631633 def foreachFunc = (rdd : RDD [T ], time : Time ) => {
632634 val iter = rdd.take(11 ).iterator
633635
634- // make a temporary file
636+ // Generate a temporary file
635637 val prefix = " spark"
636638 val suffix = " .tmp"
637639 val tempFile = File .createTempFile(prefix, suffix)
638640 val tempFileStream = new DataOutputStream (new FileOutputStream (tempFile.getAbsolutePath))
639- // write out serialized python object
641+ // Write out serialized python object to temporary file
640642 PythonRDD .writeIteratorToStream(iter, tempFileStream)
641643 tempFileStream.close()
642644
643- // This value has to be passed from python
644- // Python currently does not do cluster deployment. But what happened
645+ // pythonExec should be passed from python. Move pyprint to PythonDStream
645646 val pythonExec = new ProcessBuilder ().environment().get(" PYSPARK_PYTHON" )
646647 val sparkHome = new ProcessBuilder ().environment().get(" SPARK_HOME" )
647- // val pb = new ProcessBuilder(Seq(pythonExec, sparkHome + "/python/pyspark/streaming/pyprint.py", tempFile.getAbsolutePath())) // why this fails to compile???
648- // absolute path to the python script is needed to change because we do not use pysparkstreaming
648+ // Call python script to deserialize and print result in stdout
649649 val pb = new ProcessBuilder (pythonExec, sparkHome + " /python/pyspark/streaming/pyprint.py" , tempFile.getAbsolutePath)
650650 val workerEnv = pb.environment()
651651
652- // envVars also need to be pass
653- // workerEnv.putAll(envVars)
652+ // envVars also should be pass from python
654653 val pythonPath = sparkHome + " /python/" + File .pathSeparator + workerEnv.get(" PYTHONPATH" )
655654 workerEnv.put(" PYTHONPATH" , pythonPath)
656655 val worker = pb.start()
@@ -662,7 +661,7 @@ abstract class DStream[T: ClassTag] (
662661 println (" Time: " + time)
663662 println (" -------------------------------------------" )
664663
665- // print value from python std out
664+ // Print values which is from python std out
666665 var line = " "
667666 breakable {
668667 while (true ) {
@@ -671,7 +670,7 @@ abstract class DStream[T: ClassTag] (
671670 println(line)
672671 }
673672 }
674- // delete temporary file
673+ // Delete temporary file
675674 tempFile.delete()
676675 println()
677676
0 commit comments