Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ public class ClientContext {
*/
private final String name;

/**
* The client conf used to initialize context.
*/
private final DfsClientConf dfsClientConf;

/**
* String representation of the configuration.
*/
Expand Down Expand Up @@ -130,6 +135,17 @@ public class ClientContext {
*/
private volatile DeadNodeDetector deadNodeDetector = null;

/**
* The switch for the {@link LocatedBlocksRefresher}.
*/
private final boolean locatedBlocksRefresherEnabled;

/**
* Periodically refresh the {@link org.apache.hadoop.hdfs.protocol.LocatedBlocks} backing
* registered {@link DFSInputStream}s, to take advantage of changes in block placement.
*/
private volatile LocatedBlocksRefresher locatedBlocksRefresher = null;

/**
* Count the reference of ClientContext.
*/
Expand All @@ -146,6 +162,7 @@ private ClientContext(String name, DfsClientConf conf,
final ShortCircuitConf scConf = conf.getShortCircuitConf();

this.name = name;
this.dfsClientConf = conf;
this.confString = scConf.confAsString();
this.clientShortCircuitNum = conf.getClientShortCircuitNum();
this.shortCircuitCache = new ShortCircuitCache[this.clientShortCircuitNum];
Expand All @@ -164,6 +181,7 @@ private ClientContext(String name, DfsClientConf conf,
this.byteArrayManager = ByteArrayManager.newInstance(
conf.getWriteByteArrayManagerConf());
this.deadNodeDetectionEnabled = conf.isDeadNodeDetectionEnabled();
this.locatedBlocksRefresherEnabled = conf.isLocatedBlocksRefresherEnabled();
initTopologyResolution(config);
}

Expand Down Expand Up @@ -301,6 +319,21 @@ public DeadNodeDetector getDeadNodeDetector() {
return deadNodeDetector;
}

/**
* If true, LocatedBlocksRefresher will be periodically refreshing LocatedBlocks
* of registered DFSInputStreams.
*/
public boolean isLocatedBlocksRefresherEnabled() {
return locatedBlocksRefresherEnabled;
}

/**
* Obtain LocatedBlocksRefresher of the current client.
*/
public LocatedBlocksRefresher getLocatedBlocksRefresher() {
return locatedBlocksRefresher;
}

/**
* Increment the counter. Start the dead node detector thread if there is no
* reference.
Expand All @@ -311,6 +344,10 @@ synchronized void reference() {
deadNodeDetector = new DeadNodeDetector(name, configuration);
deadNodeDetector.start();
}
if (locatedBlocksRefresherEnabled && locatedBlocksRefresher == null) {
locatedBlocksRefresher = new LocatedBlocksRefresher(name, configuration, dfsClientConf);
locatedBlocksRefresher.start();
}
}

/**
Expand All @@ -324,5 +361,10 @@ synchronized void unreference() {
deadNodeDetector.shutdown();
deadNodeDetector = null;
}

if (counter == 0 && locatedBlocksRefresherEnabled && locatedBlocksRefresher != null) {
locatedBlocksRefresher.shutdown();
locatedBlocksRefresher = null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -863,7 +863,7 @@ public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
}

public long getRefreshReadBlkLocationsInterval() {
return dfsClientConf.getRefreshReadBlockLocationsMS();
return dfsClientConf.getLocatedBlocksRefresherInterval();
}

/**
Expand Down Expand Up @@ -3459,4 +3459,36 @@ private boolean isDeadNodeDetectionEnabled() {
public DeadNodeDetector getDeadNodeDetector() {
return clientContext.getDeadNodeDetector();
}

/**
* Obtain LocatedBlocksRefresher of the current client.
*/
public LocatedBlocksRefresher getLocatedBlockRefresher() {
return clientContext.getLocatedBlocksRefresher();
}

/**
* Adds the {@link DFSInputStream} to the {@link LocatedBlocksRefresher}, so that
* the underlying {@link LocatedBlocks} is periodically refreshed.
*/
public void addLocatedBlocksRefresh(DFSInputStream dfsInputStream) {
if (isLocatedBlocksRefresherEnabled()) {
clientContext.getLocatedBlocksRefresher().addInputStream(dfsInputStream);
}
}

/**
* Removes the {@link DFSInputStream} from the {@link LocatedBlocksRefresher}, so that
* the underlying {@link LocatedBlocks} is no longer periodically refreshed.
* @param dfsInputStream
*/
public void removeLocatedBlocksRefresh(DFSInputStream dfsInputStream) {
if (isLocatedBlocksRefresherEnabled()) {
clientContext.getLocatedBlocksRefresher().removeInputStream(dfsInputStream);
}
}

private boolean isLocatedBlocksRefresherEnabled() {
return clientContext.isLocatedBlocksRefresherEnabled();
}
}
Loading