Skip to content

Conversation

@wangxiaojing
Copy link
Contributor

For text files, the method streamingContext.textFileStream(dataDirectory).
The improvement of the streaming to Support subdirectories,spark streaming can monitor the subdirectories dataDirectory and process any files created in that directory.
eg:
streamingContext.textFileStream(/test).
Look at the direction contents:
/test/file1
/test/file2
/test/dr/file1
if the directory "/test/dr/" have new file "file2" ,spark streaming can process the file

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

1 similar comment
@AmplabJenkins
Copy link

Can one of the admins verify this patch?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove this System.out

@jerryshao
Copy link
Contributor

Hi @wangxiaojing ,a small suggestion, why not making this improvement more flexible by adding a parameter to control the searching depth of directories, this will be more general than the current 1-depth searching implementation. Like:

class FileInputDStream[K: ClassTag, V: ClassTag, F <: NewInputFormat[K,V] : ClassTag](
    @transient ssc_ : StreamingContext,
    directory: String,
    filter: Path => Boolean = FileInputDStream.defaultFilter,
    depth: Int = 1,
    newFilesOnly: Boolean = true)

People can use this parameter to control the searching depth, default 1 keeps the same semantics as current code.

Besides some while space related code styles should be changed to align with Scala style.

@wangxiaojing
Copy link
Contributor Author

Hi @jerryshao,It's changing the code to use this parameter to control the searching depth,but if the depth is greater than 1,the ignore time is not reasonable,because if the secondary subdirectories has a new file,the modification time of the first subdirectories is not change.like:
The streaming monitor the directory /tmp/
The directory structure is :
2014-10-16 19:17 /tmp/spark1
2014-10-16 19:17 /tmp/spark1/spark2

A files created in /tmp/spark1/spark2

2014-10-16 19:17 /tmp/spark1
2014-10-16 19:18 /tmp/spark1/spark2
2014-10-16 19:18 /tmp/spark1/spark2/file

If you use the ignore time to do filtering,the first subdirectories is always ignore,Can you give me some advice?

@jerryshao
Copy link
Contributor

Can we just check the time of file, not directory to filter out some unqualified files, I'm not sure about this.

cc @tdas , mind taking a look at this?

@wangxiaojing
Copy link
Contributor Author

@jerryshao @tdas First,According to the depth to check all the directory ,then filter the directory if the modification time more then the ignore time.Is this method optimal? thanks.

@wangxiaojing
Copy link
Contributor Author

@liancheng

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Add space after ,
  • Remove space before :
  • Add space after :
  • Add space after =

@wangxiaojing wangxiaojing force-pushed the spark-3586 branch 2 times, most recently from c6f1c75 to d1c3399 Compare May 20, 2015 08:41
@erfangc
Copy link

erfangc commented May 30, 2015

This feature would definitely be helpful. Thanks to @wangxiaojing and whoever continuing to work on PR!

@zsxwing
Copy link
Member

zsxwing commented Jun 1, 2015

@wangxiaojing could you update this PR? It conflicts with master

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you change System.currentTimeMillis to clock.getTimeMillis()?

@andrewor14
Copy link
Contributor

Hi @wangxiaojing it seems that #6588 is an updated version of this PR. Would you mind closing this patch since it no longer merges cleanly with master?

@wangxiaojing
Copy link
Contributor Author

@andrewor14 ok.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.