2626import java .util .Iterator ;
2727import java .util .LinkedList ;
2828import java .util .List ;
29- import java .util .concurrent .ThreadPoolExecutor ;
30- import java .util .concurrent .ArrayBlockingQueue ;
31- import java .util .concurrent .TimeUnit ;
3229
33- import org .apache .hadoop .thirdparty .com .google .common .annotations .VisibleForTesting ;
3430import org .apache .hadoop .classification .InterfaceAudience ;
3531import org .apache .hadoop .classification .InterfaceStability ;
3632import org .apache .hadoop .fs .FSDataInputStream ;
3733import org .apache .hadoop .fs .FSDataOutputStream ;
3834import org .apache .hadoop .fs .Path ;
3935import org .apache .hadoop .fs .PathIsDirectoryException ;
4036import org .apache .hadoop .io .IOUtils ;
41- import org .slf4j .Logger ;
42- import org .slf4j .LoggerFactory ;
4337
4438/** Various commands for copy files */
4539@ InterfaceAudience .Private
@@ -210,28 +204,37 @@ private void popPreserveOption(List<String> args) {
210204 /**
211205 * Copy local files to a remote filesystem
212206 */
213- public static class Get extends CommandWithDestination {
207+ public static class Get extends CopyCommandWithMultiThread {
214208 public static final String NAME = "get" ;
215209 public static final String USAGE =
216- "[-f] [-p] [-ignoreCrc] [-crc] <src> ... <localdst>" ;
210+ "[-f] [-p] [-crc] [-ignoreCrc] [-t <thread count>]"
211+ + " [-q <thread pool queue size>] <src> ... <localdst>" ;
217212 public static final String DESCRIPTION =
218- "Copy files that match the file pattern <src> " +
219- "to the local name. <src> is kept. When copying multiple " +
220- "files, the destination must be a directory. Passing " +
221- "-f overwrites the destination if it already exists and " +
222- "-p preserves access and modification times, " +
223- "ownership and the mode.\n " ;
213+ "Copy files that match the file pattern <src> to the local name. "
214+ + "<src> is kept.\n When copying multiple files, the destination"
215+ + " must be a directory.\n Flags:\n "
216+ + " -p : Preserves timestamps, ownership and the mode.\n "
217+ + " -f : Overwrites the destination if it already exists.\n "
218+ + " -crc : write CRC checksums for the files downloaded.\n "
219+ + " -ignoreCrc : Skip CRC checks on the file(s) downloaded.\n "
220+ + " -t <thread count> : Number of threads to be used,"
221+ + " default is 1.\n "
222+ + " -q <thread pool queue size> : Thread pool queue size to be"
223+ + " used, default is 1024.\n " ;
224224
225225 @ Override
226- protected void processOptions (LinkedList <String > args )
227- throws IOException {
228- CommandFormat cf = new CommandFormat (
229- 1 , Integer .MAX_VALUE , "crc" , "ignoreCrc" , "p" , "f" );
226+ protected void processOptions (LinkedList <String > args ) throws IOException {
227+ CommandFormat cf =
228+ new CommandFormat (1 , Integer .MAX_VALUE , "crc" , "ignoreCrc" , "p" , "f" );
229+ cf .addOptionWithValue ("t" );
230+ cf .addOptionWithValue ("q" );
230231 cf .parse (args );
231232 setWriteChecksum (cf .getOpt ("crc" ));
232233 setVerifyChecksum (!cf .getOpt ("ignoreCrc" ));
233234 setPreserve (cf .getOpt ("p" ));
234235 setOverwrite (cf .getOpt ("f" ));
236+ setThreadCount (cf .getOptValue ("t" ));
237+ setThreadPoolQueueSize (cf .getOptValue ("q" ));
235238 setRecursive (true );
236239 getLocalDestination (args );
237240 }
@@ -240,21 +243,12 @@ protected void processOptions(LinkedList<String> args)
240243 /**
241244 * Copy local files to a remote filesystem
242245 */
243- public static class Put extends CommandWithDestination {
244-
245- public static final Logger LOG = LoggerFactory .getLogger (Put .class );
246-
247- private ThreadPoolExecutor executor = null ;
248- private int threadPoolQueueSize = 1024 ;
249- private int numThreads = 1 ;
250-
251- private static final int MAX_THREADS =
252- Runtime .getRuntime ().availableProcessors () * 2 ;
246+ public static class Put extends CopyCommandWithMultiThread {
253247
254248 public static final String NAME = "put" ;
255249 public static final String USAGE =
256- "[-f] [-p] [-l] [-d] [-t <thread count>] [-q <threadPool queue size>] " +
257- " <localsrc> ... <dst>" ;
250+ "[-f] [-p] [-l] [-d] [-t <thread count>] [-q <thread pool queue size>]"
251+ + " <localsrc> ... <dst>" ;
258252 public static final String DESCRIPTION =
259253 "Copy files from the local file system " +
260254 "into fs. Copying fails if the file already " +
@@ -263,11 +257,11 @@ public static class Put extends CommandWithDestination {
263257 " -p : Preserves timestamps, ownership and the mode.\n " +
264258 " -f : Overwrites the destination if it already exists.\n " +
265259 " -t <thread count> : Number of threads to be used, default is 1.\n " +
266- " -q <threadPool size> : ThreadPool queue size to be used, " +
260+ " -q <thread pool queue size> : Thread pool queue size to be used, " +
267261 "default is 1024.\n " +
268- " -l : Allow DataNode to lazily persist the file to disk. Forces" +
269- " replication factor of 1. This flag will result in reduced" +
270- " durability. Use with care.\n " +
262+ " -l : Allow DataNode to lazily persist the file to disk. Forces " +
263+ "replication factor of 1. This flag will result in reduced " +
264+ "durability. Use with care.\n " +
271265 " -d : Skip creation of temporary file(<dst>._COPYING_).\n " ;
272266
273267 @ Override
@@ -277,7 +271,7 @@ protected void processOptions(LinkedList<String> args) throws IOException {
277271 cf .addOptionWithValue ("t" );
278272 cf .addOptionWithValue ("q" );
279273 cf .parse (args );
280- setNumberThreads (cf .getOptValue ("t" ));
274+ setThreadCount (cf .getOptValue ("t" ));
281275 setThreadPoolQueueSize (cf .getOptValue ("q" ));
282276 setOverwrite (cf .getOpt ("f" ));
283277 setPreserve (cf .getOpt ("p" ));
@@ -308,92 +302,9 @@ protected void processArguments(LinkedList<PathData> args)
308302 copyStreamToTarget (System .in , getTargetPath (args .get (0 )));
309303 return ;
310304 }
311-
312- executor = new ThreadPoolExecutor (numThreads , numThreads , 1 ,
313- TimeUnit .SECONDS , new ArrayBlockingQueue <>(threadPoolQueueSize ),
314- new ThreadPoolExecutor .CallerRunsPolicy ());
315305 super .processArguments (args );
316-
317- // issue the command and then wait for it to finish
318- executor .shutdown ();
319- try {
320- executor .awaitTermination (Long .MAX_VALUE , TimeUnit .MINUTES );
321- } catch (InterruptedException e ) {
322- executor .shutdownNow ();
323- displayError (e );
324- Thread .currentThread ().interrupt ();
325- }
326- }
327-
328- private void setNumberThreads (String numberThreadsString ) {
329- if (numberThreadsString == null ) {
330- numThreads = 1 ;
331- } else {
332- int parsedValue = Integer .parseInt (numberThreadsString );
333- if (parsedValue <= 1 ) {
334- numThreads = 1 ;
335- } else if (parsedValue > MAX_THREADS ) {
336- numThreads = MAX_THREADS ;
337- } else {
338- numThreads = parsedValue ;
339- }
340- }
341- }
342-
343- private void setThreadPoolQueueSize (String numThreadPoolQueueSize ) {
344- if (numThreadPoolQueueSize != null ) {
345- int parsedValue = Integer .parseInt (numThreadPoolQueueSize );
346- if (parsedValue < 1 ) {
347- LOG .warn ("The value of the thread pool queue size cannot be " +
348- "less than 1, and the default value is used here. " +
349- "The default size is 1024." );
350- threadPoolQueueSize = 1024 ;
351- } else {
352- threadPoolQueueSize = parsedValue ;
353- }
354- }
355- }
356-
357- @ VisibleForTesting
358- protected int getThreadPoolQueueSize () {
359- return threadPoolQueueSize ;
360- }
361-
362- private void copyFile (PathData src , PathData target ) throws IOException {
363- if (isPathRecursable (src )) {
364- throw new PathIsDirectoryException (src .toString ());
365- }
366- super .copyFileToTarget (src , target );
367- }
368-
369- @ Override
370- protected void copyFileToTarget (PathData src , PathData target )
371- throws IOException {
372- // if number of thread is 1, mimic put and avoid threading overhead
373- if (numThreads == 1 ) {
374- copyFile (src , target );
375- return ;
376- }
377-
378- Runnable task = () -> {
379- try {
380- copyFile (src , target );
381- } catch (IOException e ) {
382- displayError (e );
383- }
384- };
385- executor .submit (task );
386306 }
387307
388- @ VisibleForTesting
389- public int getNumThreads () {
390- return numThreads ;
391- }
392-
393- @ VisibleForTesting
394- public ThreadPoolExecutor getExecutor () {
395- return executor ;
396- }
397308 }
398309
399310 public static class CopyFromLocal extends Put {
0 commit comments