Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ public class MonitoredRPCHandlerImpl extends MonitoredTaskImpl implements Monito
private boolean snapshot = false;
private Map<String, Object> callInfoMap = new HashMap<>();

public MonitoredRPCHandlerImpl() {
super(false);
public MonitoredRPCHandlerImpl(String description) {
super(false, description);
// in this implementation, WAITING indicates that the handler is not
// actively servicing an RPC call.
setState(State.WAITING);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.hadoop.hbase.util.GsonUtil;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hbase.thirdparty.com.google.gson.Gson;

Expand All @@ -46,11 +47,13 @@ class MonitoredTaskImpl implements MonitoredTask {

private static final Gson GSON = GsonUtil.createGson().create();

public MonitoredTaskImpl(boolean enableJournal) {
public MonitoredTaskImpl(boolean enableJournal, String description) {
startTime = EnvironmentEdgeManager.currentTime();
statusTime = startTime;
stateTime = startTime;
warnTime = startTime;
this.description = description;
this.status = "status unset";
if (enableJournal) {
journal = new ConcurrentLinkedQueue<>();
} else {
Expand Down Expand Up @@ -161,6 +164,7 @@ public void abort(String msg) {

@Override
public void setStatus(String status) {
Preconditions.checkNotNull(status, "Status is null");
this.status = status;
statusTime = EnvironmentEdgeManager.currentTime();
if (journal != null) {
Expand All @@ -175,6 +179,7 @@ protected void setState(State state) {

@Override
public void setDescription(String description) {
Preconditions.checkNotNull(description, "Description is null");
this.description = description;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,7 @@ public MonitoredTask createStatus(String description, boolean ignore) {

public synchronized MonitoredTask createStatus(String description, boolean ignore,
boolean enableJournal) {
MonitoredTask stat = new MonitoredTaskImpl(enableJournal);
stat.setDescription(description);
MonitoredTask stat = new MonitoredTaskImpl(enableJournal, description);
MonitoredTask proxy = (MonitoredTask) Proxy.newProxyInstance(stat.getClass().getClassLoader(),
new Class<?>[] { MonitoredTask.class }, new PassthroughInvocationHandler<>(stat));
TaskAndWeakRefPair pair = new TaskAndWeakRefPair(stat, proxy);
Expand All @@ -109,8 +108,7 @@ public synchronized MonitoredTask createStatus(String description, boolean ignor
}

public synchronized MonitoredRPCHandler createRPCStatus(String description) {
MonitoredRPCHandler stat = new MonitoredRPCHandlerImpl();
stat.setDescription(description);
MonitoredRPCHandler stat = new MonitoredRPCHandlerImpl(description);
MonitoredRPCHandler proxy =
(MonitoredRPCHandler) Proxy.newProxyInstance(stat.getClass().getClassLoader(),
new Class<?>[] { MonitoredRPCHandler.class }, new PassthroughInvocationHandler<>(stat));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public void testSimpleCall() {

TraceUtil.trace(() -> {
CallRunner cr = new CallRunner(mockRpcServer, mockCall);
cr.setStatus(new MonitoredRPCHandlerImpl());
cr.setStatus(new MonitoredRPCHandlerImpl("test"));
cr.run();
}, testName.getMethodName());

Expand All @@ -101,7 +101,7 @@ public void testCallCleanup() {

TraceUtil.trace(() -> {
CallRunner cr = new CallRunner(mockRpcServer, mockCall);
cr.setStatus(new MonitoredRPCHandlerImpl());
cr.setStatus(new MonitoredRPCHandlerImpl("test"));
cr.run();
}, testName.getMethodName());
Mockito.verify(mockCall, Mockito.times(1)).cleanup();
Expand All @@ -116,7 +116,7 @@ public void testCallRunnerDropDisconnected() {

TraceUtil.trace(() -> {
CallRunner cr = new CallRunner(mockRpcServer, mockCall);
cr.setStatus(new MonitoredRPCHandlerImpl());
cr.setStatus(new MonitoredRPCHandlerImpl("test"));
cr.drop();
}, testName.getMethodName());
Mockito.verify(mockCall, Mockito.times(1)).cleanup();
Expand All @@ -142,7 +142,7 @@ public void testCallRunnerDropConnected() {

TraceUtil.trace(() -> {
CallRunner cr = new CallRunner(mockRpcServer, mockCall);
cr.setStatus(new MonitoredRPCHandlerImpl());
cr.setStatus(new MonitoredRPCHandlerImpl("test"));
cr.drop();
}, testName.getMethodName());
Mockito.verify(mockCall, Mockito.times(1)).cleanup();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ public void testCallQueueInfo() throws IOException, InterruptedException {

for (int i = totalCallMethods; i > 0; i--) {
CallRunner task = createMockTask();
task.setStatus(new MonitoredRPCHandlerImpl());
task.setStatus(new MonitoredRPCHandlerImpl("test"));

if (!scheduler.dispatch(task)) {
unableToDispatch++;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public void testBasic() throws IOException, InterruptedException {
scheduler.init(CONTEXT);
scheduler.start();
CallRunner task = createMockTask();
task.setStatus(new MonitoredRPCHandlerImpl());
task.setStatus(new MonitoredRPCHandlerImpl("test"));
scheduler.dispatch(task);
verify(task, timeout(10000)).run();
scheduler.stop();
Expand Down Expand Up @@ -164,7 +164,7 @@ public void testCallQueueInfo() throws IOException, InterruptedException {
int totalCallMethods = 10;
for (int i = totalCallMethods; i > 0; i--) {
CallRunner task = createMockTask();
task.setStatus(new MonitoredRPCHandlerImpl());
task.setStatus(new MonitoredRPCHandlerImpl("test"));
scheduler.dispatch(task);
}

Expand Down Expand Up @@ -205,7 +205,7 @@ public Void answer(InvocationOnMock invocationOnMock) throws Throwable {
}
};
for (CallRunner task : tasks) {
task.setStatus(new MonitoredRPCHandlerImpl());
task.setStatus(new MonitoredRPCHandlerImpl("test"));
doAnswer(answerToRun).when(task).run();
}

Expand Down Expand Up @@ -524,7 +524,7 @@ public void testScanQueues() throws Exception {

private void doAnswerTaskExecution(final CallRunner callTask, final ArrayList<Integer> results,
final int value, final int sleepInterval) {
callTask.setStatus(new MonitoredRPCHandlerImpl());
callTask.setStatus(new MonitoredRPCHandlerImpl("test"));
doAnswer(new Answer<Object>() {
@Override
public Object answer(InvocationOnMock invocation) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@ public void testTaskMonitorBasics() {
assertEquals(task.getDescription(), taskFromTm.getDescription());
assertEquals(-1, taskFromTm.getCompletionTimestamp());
assertEquals(MonitoredTask.State.RUNNING, taskFromTm.getState());
assertEquals(task.getStatus(), taskFromTm.getStatus());
assertEquals("status unset", taskFromTm.getStatus());

// Mark it as finished
task.markComplete("Finished!");
Expand Down Expand Up @@ -228,7 +230,7 @@ public void testStatusJournal() {

@Test
public void testClone() throws Exception {
MonitoredRPCHandlerImpl monitor = new MonitoredRPCHandlerImpl();
MonitoredRPCHandlerImpl monitor = new MonitoredRPCHandlerImpl("test");
monitor.abort("abort RPC");
TestParam testParam = new TestParam("param1");
monitor.setRPC("method1", new Object[] { testParam }, 0);
Expand Down