Skip to content

Commit fbd7e2b

Browse files
committed
Merge branch 'trunk' of github.com:apache/hadoop into HADOOP-15984-trunk-2
2 parents 32d214f + 130bd03 commit fbd7e2b

File tree

100 files changed

+3489
-1071
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

100 files changed

+3489
-1071
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeysPublic.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -214,6 +214,15 @@ public class CommonConfigurationKeysPublic {
214214
public static final String FS_TRASH_INTERVAL_KEY = "fs.trash.interval";
215215
/** Default value for FS_TRASH_INTERVAL_KEY */
216216
public static final long FS_TRASH_INTERVAL_DEFAULT = 0;
217+
/**
218+
* @see
219+
* <a href="{@docRoot}/../hadoop-project-dist/hadoop-common/core-default.xml">
220+
* core-default.xml</a>
221+
*/
222+
public static final String FS_TRASH_CLEAN_TRASHROOT_ENABLE_KEY =
223+
"fs.trash.clean.trashroot.enable";
224+
/** Default value for FS_TRASH_CLEAN_TRASHROOT_ENABLE_KEY. */
225+
public static final boolean FS_TRASH_CLEAN_TRASHROOT_ENABLE_DEFAULT = false;
217226
/**
218227
* @see
219228
* <a href="{@docRoot}/../hadoop-project-dist/hadoop-common/core-default.xml">

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/TrashPolicyDefault.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@
1919

2020
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_CHECKPOINT_INTERVAL_DEFAULT;
2121
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_CHECKPOINT_INTERVAL_KEY;
22+
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_CLEAN_TRASHROOT_ENABLE_DEFAULT;
23+
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_CLEAN_TRASHROOT_ENABLE_KEY;
2224
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_DEFAULT;
2325
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY;
2426

@@ -70,6 +72,8 @@ public class TrashPolicyDefault extends TrashPolicy {
7072

7173
private long emptierInterval;
7274

75+
private boolean cleanNonCheckpointUnderTrashRoot;
76+
7377
public TrashPolicyDefault() { }
7478

7579
private TrashPolicyDefault(FileSystem fs, Configuration conf)
@@ -90,6 +94,8 @@ public void initialize(Configuration conf, FileSystem fs, Path home) {
9094
this.emptierInterval = (long)(conf.getFloat(
9195
FS_TRASH_CHECKPOINT_INTERVAL_KEY, FS_TRASH_CHECKPOINT_INTERVAL_DEFAULT)
9296
* MSECS_PER_MINUTE);
97+
this.cleanNonCheckpointUnderTrashRoot = conf.getBoolean(
98+
FS_TRASH_CLEAN_TRASHROOT_ENABLE_KEY, FS_TRASH_CLEAN_TRASHROOT_ENABLE_DEFAULT);
9399
}
94100

95101
@Override
@@ -101,6 +107,8 @@ public void initialize(Configuration conf, FileSystem fs) {
101107
this.emptierInterval = (long)(conf.getFloat(
102108
FS_TRASH_CHECKPOINT_INTERVAL_KEY, FS_TRASH_CHECKPOINT_INTERVAL_DEFAULT)
103109
* MSECS_PER_MINUTE);
110+
this.cleanNonCheckpointUnderTrashRoot = conf.getBoolean(
111+
FS_TRASH_CLEAN_TRASHROOT_ENABLE_KEY, FS_TRASH_CLEAN_TRASHROOT_ENABLE_DEFAULT);
104112
if (deletionInterval < 0) {
105113
LOG.warn("Invalid value {} for deletion interval,"
106114
+ " deletion interaval can not be negative."
@@ -374,8 +382,14 @@ private void deleteCheckpoint(Path trashRoot, boolean deleteImmediately)
374382
try {
375383
time = getTimeFromCheckpoint(name);
376384
} catch (ParseException e) {
377-
LOG.warn("Unexpected item in trash: "+dir+". Ignoring.");
378-
continue;
385+
if (cleanNonCheckpointUnderTrashRoot) {
386+
fs.delete(path, true);
387+
LOG.warn("Unexpected item in trash: " + dir + ". Deleting.");
388+
continue;
389+
} else {
390+
LOG.warn("Unexpected item in trash: " + dir + ". Ignoring.");
391+
continue;
392+
}
379393
}
380394

381395
if (((now - deletionInterval) > time) || deleteImmediately) {

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/CachingBlockManager.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,7 @@ public abstract class CachingBlockManager extends BlockManager {
110110
* @param prefetchingStatistics statistics for this stream.
111111
* @param conf the configuration.
112112
* @param localDirAllocator the local dir allocator instance.
113+
* @param maxBlocksCount max blocks count to be kept in cache at any time.
113114
* @throws IllegalArgumentException if bufferPoolSize is zero or negative.
114115
*/
115116
public CachingBlockManager(
@@ -118,7 +119,8 @@ public CachingBlockManager(
118119
int bufferPoolSize,
119120
PrefetchingStatistics prefetchingStatistics,
120121
Configuration conf,
121-
LocalDirAllocator localDirAllocator) {
122+
LocalDirAllocator localDirAllocator,
123+
int maxBlocksCount) {
122124
super(blockData);
123125

124126
Validate.checkPositiveInteger(bufferPoolSize, "bufferPoolSize");
@@ -129,16 +131,16 @@ public CachingBlockManager(
129131
this.numReadErrors = new AtomicInteger();
130132
this.cachingDisabled = new AtomicBoolean();
131133
this.prefetchingStatistics = requireNonNull(prefetchingStatistics);
134+
this.conf = requireNonNull(conf);
132135

133136
if (this.getBlockData().getFileSize() > 0) {
134137
this.bufferPool = new BufferPool(bufferPoolSize, this.getBlockData().getBlockSize(),
135138
this.prefetchingStatistics);
136-
this.cache = this.createCache();
139+
this.cache = this.createCache(maxBlocksCount);
137140
}
138141

139142
this.ops = new BlockOperations();
140143
this.ops.setDebug(false);
141-
this.conf = requireNonNull(conf);
142144
this.localDirAllocator = localDirAllocator;
143145
}
144146

@@ -557,8 +559,8 @@ private void addToCacheAndRelease(BufferData data, Future<Void> blockFuture,
557559
}
558560
}
559561

560-
protected BlockCache createCache() {
561-
return new SingleFilePerBlockCache(prefetchingStatistics);
562+
protected BlockCache createCache(int maxBlocksCount) {
563+
return new SingleFilePerBlockCache(prefetchingStatistics, maxBlocksCount);
562564
}
563565

564566
protected void cachePut(int blockNumber, ByteBuffer buffer) throws IOException {
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.hadoop.fs.impl.prefetch;
21+
22+
import java.util.concurrent.TimeUnit;
23+
24+
/**
25+
* Constants used by prefetch implementations.
26+
*/
27+
public final class PrefetchConstants {
28+
29+
private PrefetchConstants() {
30+
}
31+
32+
/**
33+
* Timeout to be used by close, while acquiring prefetch block write lock.
34+
* Value = {@value PREFETCH_WRITE_LOCK_TIMEOUT}
35+
*/
36+
static final int PREFETCH_WRITE_LOCK_TIMEOUT = 5;
37+
38+
/**
39+
* Lock timeout unit to be used by the thread while acquiring prefetch block write lock.
40+
* Value = {@value PREFETCH_WRITE_LOCK_TIMEOUT_UNIT}
41+
*/
42+
static final TimeUnit PREFETCH_WRITE_LOCK_TIMEOUT_UNIT = TimeUnit.SECONDS;
43+
44+
}

0 commit comments

Comments
 (0)