Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,13 @@ final class BlockingThreadPoolExecutorService

private static final AtomicInteger POOLNUMBER = new AtomicInteger(1);

private final int maxActiveTasks;
private final ThreadPoolExecutor eventProcessingExecutor;

public int getMaxActiveTasks() {
return maxActiveTasks;
}

/**
* Returns a {@link java.util.concurrent.ThreadFactory} that names each
* created thread uniquely,
Expand Down Expand Up @@ -104,10 +109,17 @@ public Thread newThread(Runnable r) {
};
}

/**
* Create an instance.
* @param permitCount total permit count
* @param maxActiveTasks maximum number of active tasks (for lookup only)
* @param eventProcessingExecutor the executor doing the real work.
*/
private BlockingThreadPoolExecutorService(int permitCount,
ThreadPoolExecutor eventProcessingExecutor) {
int maxActiveTasks, ThreadPoolExecutor eventProcessingExecutor) {
super(MoreExecutors.listeningDecorator(eventProcessingExecutor),
permitCount, false);
this.maxActiveTasks = maxActiveTasks;
this.eventProcessingExecutor = eventProcessingExecutor;
}

Expand All @@ -131,8 +143,9 @@ public static BlockingThreadPoolExecutorService newInstance(
/* Although we generally only expect up to waitingTasks tasks in the
queue, we need to be able to buffer all tasks in case dequeueing is
slower than enqueueing. */
int totalTasks = waitingTasks + activeTasks;
final BlockingQueue<Runnable> workQueue =
new LinkedBlockingQueue<>(waitingTasks + activeTasks);
new LinkedBlockingQueue<>(totalTasks);
ThreadPoolExecutor eventProcessingExecutor =
new ThreadPoolExecutor(activeTasks, activeTasks, keepAliveTime, unit,
workQueue, newDaemonThreadFactory(prefixName),
Expand All @@ -146,7 +159,7 @@ public void rejectedExecution(Runnable r,
}
});
eventProcessingExecutor.allowCoreThreadTimeOut(true);
return new BlockingThreadPoolExecutorService(waitingTasks + activeTasks,
return new BlockingThreadPoolExecutorService(totalTasks, activeTasks,
eventProcessingExecutor);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.s3a;

/**
* Execute copy operations in parallel; blocks until all operations
* are completed.
*/
class ParallelCopier {
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.s3a;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIOException;

/**
* Error to indicate that a specific rename failed.
* Target path is set to destination.
*/
public class RenameFailedException extends PathIOException {

private boolean exitCode = false;

public RenameFailedException(String src, String dest, Throwable cause) {
super(src, cause);
setOperation("rename");
setTargetPath(dest);
}

public RenameFailedException(String src, String dest, String error) {
super(src, error);
setOperation("rename");
setTargetPath(dest);
}

public RenameFailedException(Path src, Path optionalDest, String error) {
super(src.toString(), error);
setOperation("rename");
if (optionalDest != null) {
setTargetPath(optionalDest.toString());
}
}

public boolean getExitCode() {
return exitCode;
}

public RenameFailedException withExitCode(boolean exitCode) {
this.exitCode = exitCode;
return this;
}
}
Loading