Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2651,4 +2651,13 @@ List<LogEntry> getLogEntries(Set<ServerName> serverNames, String logType, Server
* Get the list of cached files
*/
List<String> getCachedFilesList(ServerName serverName) throws IOException;

/**
* Clean Cache by evicting the blocks of files belonging to regions that are no longer served by
* the RegionServer.
* @param serverName ServerName
* @return A map of filename and number of blocks evicted.
* @throws IOException if a remote or network exception occurs
*/
Map<String, Integer> uncacheStaleBlocks(ServerName serverName) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -1136,4 +1136,9 @@ public void flushMasterStore() throws IOException {
public List<String> getCachedFilesList(ServerName serverName) throws IOException {
return get(admin.getCachedFilesList(serverName));
}

@Override
public Map<String, Integer> uncacheStaleBlocks(ServerName serverName) throws IOException {
return get(admin.uncacheStaleBlocks(serverName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1861,4 +1861,12 @@ CompletableFuture<List<LogEntry>> getLogEntries(Set<ServerName> serverNames, Str
* Get the list of cached files
*/
CompletableFuture<List<String>> getCachedFilesList(ServerName serverName);

/**
* Clean Cache by evicting the blocks of files belonging to regions that are no longer served by
* the RegionServer.
* @param serverName ServerName
* @return A map of filename and number of blocks evicted.
*/
CompletableFuture<Map<String, Integer>> uncacheStaleBlocks(ServerName serverName);
}
Original file line number Diff line number Diff line change
Expand Up @@ -1005,4 +1005,9 @@ public CompletableFuture<Void> flushMasterStore() {
public CompletableFuture<List<String>> getCachedFilesList(ServerName serverName) {
return wrap(rawAdmin.getCachedFilesList(serverName));
}

@Override
public CompletableFuture<Map<String, Integer>> uncacheStaleBlocks(ServerName serverName) {
return wrap(rawAdmin.uncacheStaleBlocks(serverName));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,8 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UncacheStaleBlocksRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UncacheStaleBlocksResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateConfigurationResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
Expand Down Expand Up @@ -4537,4 +4539,15 @@ List<String>> adminCall(controller, stub, request.build(),
resp -> resp.getCachedFilesList()))
.serverName(serverName).call();
}

@Override
public CompletableFuture<Map<String, Integer>> uncacheStaleBlocks(ServerName serverName) {
UncacheStaleBlocksRequest.Builder request = UncacheStaleBlocksRequest.newBuilder();
return this.<Map<String, Integer>> newAdminCaller()
.action((controller, stub) -> this.<UncacheStaleBlocksRequest, UncacheStaleBlocksResponse,
Map<String, Integer>> adminCall(controller, stub, request.build(),
(s, c, req, done) -> s.uncacheStaleBlocks(c, req, done),
resp -> resp.getUncachedFilesMap()))
.serverName(serverName).call();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,8 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetStoreFileResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.OpenRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ServerInfo;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UncacheStaleBlocksRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UncacheStaleBlocksResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.CellProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
Expand Down Expand Up @@ -1835,6 +1837,22 @@ public static List<String> getCachedFilesList(final RpcController controller,
return new ArrayList<>(response.getCachedFilesList());
}

/**
* Clean Cache by evicting the blocks of files belonging to regions that are no longer served by
* the RegionServer.
*/
public static Map<String, Integer> uncacheStaleBlocks(final RpcController controller,
final AdminService.BlockingInterface admin) throws IOException {
UncacheStaleBlocksRequest request = UncacheStaleBlocksRequest.newBuilder().build();
UncacheStaleBlocksResponse response = null;
try {
response = admin.uncacheStaleBlocks(controller, request);
} catch (ServiceException se) {
throw getRemoteException(se);
}
return response.getUncachedFilesMap();
}

/**
* Get the list of region info from a GetOnlineRegionResponse
* @param proto the GetOnlineRegionResponse
Expand Down
11 changes: 11 additions & 0 deletions hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto
Original file line number Diff line number Diff line change
Expand Up @@ -336,6 +336,14 @@ message ClearSlowLogResponses {
required bool is_cleaned = 1;
}

message UncacheStaleBlocksRequest {
}

message UncacheStaleBlocksResponse {
map<string, int32> uncached_files = 1;
}


service AdminService {
rpc GetRegionInfo(GetRegionInfoRequest)
returns(GetRegionInfoResponse);
Expand Down Expand Up @@ -415,4 +423,7 @@ service AdminService {
rpc GetCachedFilesList(GetCachedFilesListRequest)
returns(GetCachedFilesListResponse);

rpc UncacheStaleBlocks(UncacheStaleBlocksRequest)
returns(UncacheStaleBlocksResponse);

}
Original file line number Diff line number Diff line change
Expand Up @@ -2146,6 +2146,5 @@ public Optional<Integer> getBlockSize(BlockCacheKey key) {
} else {
return Optional.of(entry.getOnDiskSizeWithHeader());
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,8 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UncacheStaleBlocksRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UncacheStaleBlocksResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest;
Expand Down Expand Up @@ -3402,6 +3404,12 @@ public HBaseProtos.LogEntry getLogEntries(RpcController controller,
throw new ServiceException("Invalid request params");
}

@Override
public UncacheStaleBlocksResponse uncacheStaleBlocks(RpcController controller,
UncacheStaleBlocksRequest request) throws ServiceException {
throw new ServiceException(new DoNotRetryIOException("Unsupported method on master"));
}

private MasterProtos.BalancerDecisionsResponse
getBalancerDecisions(MasterProtos.BalancerDecisionsRequest request) {
final NamedQueueRecorder namedQueueRecorder = this.server.getNamedQueueRecorder();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,12 @@
import java.util.TimerTask;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -1659,6 +1662,43 @@ public RegionLoad createRegionLoad(final String encodedRegionName) throws IOExce
return r != null ? createRegionLoad(r, null, null) : null;
}

public Map<String, Integer> uncacheStaleBlocks() {
Map<String, Pair<String, Long>> fullyCachedFiles =
this.getBlockCache().flatMap(BlockCache::getFullyCachedFiles).orElse(Collections.emptyMap());
Map<String, Integer> evictedFilesWithStaleBlocks = new ConcurrentHashMap<>();

ExecutorService executor = Executors.newFixedThreadPool(6);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The threadpool should be global for HRegionServer. We don't want a new threadpool created on every client call to this method, that would potentially kill the RS if an operator calls it multiple times on a short period.


List<Callable<Void>> tasks = new ArrayList<>();

fullyCachedFiles.forEach((fileName, value) -> {
Callable<Void> task = () -> {
HRegion regionOnServer = getRegion(value.getFirst());
int blocksEvicted = (regionOnServer == null || !regionOnServer.isAvailable())
? this.getBlockCache().get().evictBlocksByHfileName(fileName)
: 0;
evictedFilesWithStaleBlocks.put(fileName, blocksEvicted);
LOG.info(
"Uncached {} blocks belonging to the file {} as the region {} "
+ "is not served by the region server {} anymore.",
blocksEvicted, fileName, value.getFirst(), this.getServerName());
return null;
};
tasks.add(task);
Comment on lines +1675 to +1687
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should do a single task per call to the method, not a task per file, that would create yet another collection in memory with as many objects as the total files cached. On large caches, that would be impacting.

});

try {
executor.invokeAll(tasks);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.error("Thread interrupted while processing tasks for uncaching stale blocks: {}",
e.getMessage());
} finally {
executor.shutdown();
}
return evictedFilesWithStaleBlocks;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should think about an alternative result here, as this map is effectively being updated by the background task, which can be confusing for the caller. I would rather make this as void, or a boolean, returning true indicating that the task is submitted and is running in the background.

}

/**
* Inner class that runs on a long period checking if regions need compaction.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,8 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.RollWALWriterResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.StopServerResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UncacheStaleBlocksRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UncacheStaleBlocksResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
Expand Down Expand Up @@ -3964,4 +3966,13 @@ public GetCachedFilesListResponse getCachedFilesList(RpcController controller,
});
return responseBuilder.addAllCachedFiles(fullyCachedFiles).build();
}

@Override
public UncacheStaleBlocksResponse uncacheStaleBlocks(RpcController controller,
UncacheStaleBlocksRequest request) throws ServiceException {
UncacheStaleBlocksResponse.Builder responseBuilder = UncacheStaleBlocksResponse.newBuilder();
Map<String, Integer> evictedFilesWithStaleBlocks = new HashMap<>(server.uncacheStaleBlocks());
responseBuilder.putAllUncachedFiles(evictedFilesWithStaleBlocks);
return responseBuilder.build();
}
}
Loading