@@ -312,6 +312,37 @@ class JavaStreamingContext(val ssc: StreamingContext) extends Closeable {
312312 ssc.fileStream[K , V , F ](directory, fn, newFilesOnly)
313313 }
314314
315+ /**
316+ * Create an input stream that monitors a Hadoop-compatible filesystem
317+ * for new files and reads them using the given key-value types and input format.
318+ * Files must be written to the monitored directory by "moving" them from another
319+ * location within the same file system. File names starting with . are ignored.
320+ * @param directory HDFS directory to monitor for new file
321+ * @param kClass class of key for reading HDFS file
322+ * @param vClass class of value for reading HDFS file
323+ * @param fClass class of input format for reading HDFS file
324+ * @param filter Function to filter paths to process
325+ * @param newFilesOnly Should process only new files and ignore existing files in the directory
326+ * @param conf Hadoop configuration
327+ * @tparam K Key type for reading HDFS file
328+ * @tparam V Value type for reading HDFS file
329+ * @tparam F Input format for reading HDFS file
330+ */
331+ def fileStream [K , V , F <: NewInputFormat [K , V ]](
332+ directory : String ,
333+ kClass : Class [K ],
334+ vClass : Class [V ],
335+ fClass : Class [F ],
336+ filter : JFunction [Path , JBoolean ],
337+ newFilesOnly : Boolean ,
338+ conf : Configuration ): JavaPairInputDStream [K , V ] = {
339+ implicit val cmk : ClassTag [K ] = ClassTag (kClass)
340+ implicit val cmv : ClassTag [V ] = ClassTag (vClass)
341+ implicit val cmf : ClassTag [F ] = ClassTag (fClass)
342+ def fn = (x : Path ) => filter.call(x).booleanValue()
343+ ssc.fileStream[K , V , F ](directory, fn, newFilesOnly, conf)
344+ }
345+
315346 /**
316347 * Create an input stream with any arbitrary user implemented actor receiver.
317348 * @param props Props object defining creation of the actor
0 commit comments