Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.sdk.trace.data.EventData;
import io.opentelemetry.sdk.trace.data.SpanData;
import io.opentelemetry.sdk.trace.data.StatusData;
import io.opentelemetry.semconv.trace.attributes.SemanticAttributes;
import java.time.Duration;
import java.util.Objects;
Expand Down Expand Up @@ -144,23 +143,20 @@ protected String featureValueOf(SpanData item) {
};
}

public static Matcher<SpanData> hasStatusWithCode(StatusCode statusCode) {
final Matcher<StatusCode> matcher = is(equalTo(statusCode));
return new TypeSafeMatcher<SpanData>() {
@Override
protected boolean matchesSafely(SpanData item) {
final StatusData statusData = item.getStatus();
return statusData != null && statusData.getStatusCode() != null
&& matcher.matches(statusData.getStatusCode());
}

public static Matcher<SpanData> hasStatusWithCode(Matcher<StatusCode> matcher) {
return new FeatureMatcher<SpanData, StatusCode>(matcher, "SpanData with StatusCode that",
"statusWithCode") {
@Override
public void describeTo(Description description) {
description.appendText("SpanData with StatusCode that ").appendDescriptionOf(matcher);
protected StatusCode featureValueOf(SpanData actual) {
return actual.getStatus().getStatusCode();
}
};
}

public static Matcher<SpanData> hasStatusWithCode(StatusCode statusCode) {
return hasStatusWithCode(is(equalTo(statusCode)));
}

public static Matcher<SpanData> hasTraceId(String traceId) {
return hasTraceId(is(equalTo(traceId)));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -163,8 +164,9 @@ public boolean scheduleChore(ScheduledChore chore) {
chore.getChoreService().cancelChore(chore);
}
chore.setChoreService(this);
ScheduledFuture<?> future = scheduler.scheduleAtFixedRate(chore, chore.getInitialDelay(),
chore.getPeriod(), chore.getTimeUnit());
ScheduledFuture<?> future =
scheduler.scheduleAtFixedRate(TraceUtil.tracedRunnable(chore, chore.getName()),
chore.getInitialDelay(), chore.getPeriod(), chore.getTimeUnit());
scheduledChores.put(chore, future);
return true;
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,31 @@ private static void endSpan(CompletableFuture<?> future, Span span) {
});
}

/**
* Wrap the provided {@code runnable} in a {@link Runnable} that is traced.
*/
public static Runnable tracedRunnable(final Runnable runnable, final String spanName) {
return tracedRunnable(runnable, () -> createSpan(spanName));
}

/**
* Wrap the provided {@code runnable} in a {@link Runnable} that is traced.
*/
public static Runnable tracedRunnable(final Runnable runnable,
final Supplier<Span> spanSupplier) {
// N.B. This method name follows the convention of this class, i.e., tracedFuture, rather than
// the convention of the OpenTelemetry classes, i.e., Context#wrap.
return () -> {
final Span span = spanSupplier.get();
try (final Scope ignored = span.makeCurrent()) {
runnable.run();
span.setStatus(StatusCode.OK);
} finally {
span.end();
}
};
}

/**
* A {@link Runnable} that may also throw.
* @param <T> the type of {@link Throwable} that can be produced.
Expand All @@ -144,11 +169,17 @@ public interface ThrowingRunnable<T extends Throwable> {
void run() throws T;
}

/**
* Trace the execution of {@code runnable}.
*/
public static <T extends Throwable> void trace(final ThrowingRunnable<T> runnable,
final String spanName) throws T {
trace(runnable, () -> createSpan(spanName));
}

/**
* Trace the execution of {@code runnable}.
*/
public static <T extends Throwable> void trace(final ThrowingRunnable<T> runnable,
final Supplier<Span> spanSupplier) throws T {
Span span = spanSupplier.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.concurrent.ThreadFactory;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.types.CopyOnWriteArrayMap;
import org.apache.hadoop.hbase.util.RetryCounter;
import org.apache.hadoop.hbase.util.RetryCounterFactory;
Expand Down Expand Up @@ -101,41 +102,43 @@ public MetaRegionLocationCache(ZKWatcher zkWatcher) {
* @param retryCounter controls the number of retries and sleep between retries.
*/
private void loadMetaLocationsFromZk(RetryCounter retryCounter, ZNodeOpType opType) {
List<String> znodes = null;
while (retryCounter.shouldRetry()) {
try {
znodes = watcher.getMetaReplicaNodesAndWatchChildren();
break;
} catch (KeeperException ke) {
LOG.debug("Error populating initial meta locations", ke);
if (!retryCounter.shouldRetry()) {
// Retries exhausted and watchers not set. This is not a desirable state since the cache
// could remain stale forever. Propagate the exception.
watcher.abort("Error populating meta locations", ke);
return;
}
TraceUtil.trace(() -> {
List<String> znodes = null;
while (retryCounter.shouldRetry()) {
try {
retryCounter.sleepUntilNextRetry();
} catch (InterruptedException ie) {
LOG.error("Interrupted while loading meta locations from ZK", ie);
Thread.currentThread().interrupt();
return;
znodes = watcher.getMetaReplicaNodesAndWatchChildren();
break;
} catch (KeeperException ke) {
LOG.debug("Error populating initial meta locations", ke);
if (!retryCounter.shouldRetry()) {
// Retries exhausted and watchers not set. This is not a desirable state since the cache
// could remain stale forever. Propagate the exception.
watcher.abort("Error populating meta locations", ke);
return;
}
try {
retryCounter.sleepUntilNextRetry();
} catch (InterruptedException ie) {
LOG.error("Interrupted while loading meta locations from ZK", ie);
Thread.currentThread().interrupt();
return;
}
}
}
}
if (znodes == null || znodes.isEmpty()) {
// No meta znodes exist at this point but we registered a watcher on the base znode to listen
// for updates. They will be handled via nodeChildrenChanged().
return;
}
if (znodes.size() == cachedMetaLocations.size()) {
// No new meta znodes got added.
return;
}
for (String znode : znodes) {
String path = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, znode);
updateMetaLocation(path, opType);
}
if (znodes == null || znodes.isEmpty()) {
// No meta znodes exist at this point but we registered a watcher on the base znode to
// listen for updates. They will be handled via nodeChildrenChanged().
return;
}
if (znodes.size() == cachedMetaLocations.size()) {
// No new meta znodes got added.
return;
}
for (String znode : znodes) {
String path = ZNodePaths.joinZNode(watcher.getZNodePaths().baseZNode, znode);
updateMetaLocation(path, opType);
}
}, "MetaRegionLocationCache.loadMetaLocationsFromZk");
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
import com.google.errorprone.annotations.RestrictedApi;
import com.google.protobuf.Descriptors;
import com.google.protobuf.Service;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.StatusCode;
import io.opentelemetry.context.Scope;
import java.io.IOException;
import java.io.InterruptedIOException;
import java.lang.reflect.Constructor;
Expand Down Expand Up @@ -213,6 +216,7 @@
import org.apache.hadoop.hbase.security.AccessDeniedException;
import org.apache.hadoop.hbase.security.SecurityConstants;
import org.apache.hadoop.hbase.security.UserProvider;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.Addressing;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CoprocessorConfigurationUtil;
Expand Down Expand Up @@ -453,7 +457,8 @@ public class HMaster extends HRegionServer implements MasterServices {
*/
public HMaster(final Configuration conf) throws IOException {
super(conf);
try {
final Span span = TraceUtil.createSpan("HMaster.cxtor");
try (Scope ignored = span.makeCurrent()) {
if (conf.getBoolean(MAINTENANCE_MODE, false)) {
LOG.info("Detected {}=true via configuration.", MAINTENANCE_MODE);
maintenanceMode = true;
Expand Down Expand Up @@ -514,11 +519,15 @@ public HMaster(final Configuration conf) throws IOException {
cachedClusterId = new CachedClusterId(this, conf);

this.regionServerTracker = new RegionServerTracker(zooKeeper, this);
span.setStatus(StatusCode.OK);
} catch (Throwable t) {
// Make sure we log the exception. HMaster is often started via reflection and the
// cause of failed startup is lost.
TraceUtil.setError(span, t);
LOG.error("Failed construction of Master", t);
throw t;
} finally {
span.end();
}
}

Expand Down Expand Up @@ -547,7 +556,7 @@ private void registerConfigurationObservers() {
public void run() {
try {
registerConfigurationObservers();
Threads.setDaemonThreadRunning(new Thread(() -> {
Threads.setDaemonThreadRunning(new Thread(() -> TraceUtil.trace(() -> {
try {
int infoPort = putUpJettyServer();
startActiveMasterManager(infoPort);
Expand All @@ -560,23 +569,29 @@ public void run() {
abort(error, t);
}
}
}), getName() + ":becomeActiveMaster");
}, "HMaster.becomeActiveMaster")), getName() + ":becomeActiveMaster");
// Fall in here even if we have been aborted. Need to run the shutdown services and
// the super run call will do this for us.
super.run();
} finally {
if (this.clusterSchemaService != null) {
// If on way out, then we are no longer active master.
this.clusterSchemaService.stopAsync();
try {
this.clusterSchemaService
.awaitTerminated(getConfiguration().getInt(HBASE_MASTER_WAIT_ON_SERVICE_IN_SECONDS,
DEFAULT_HBASE_MASTER_WAIT_ON_SERVICE_IN_SECONDS), TimeUnit.SECONDS);
} catch (TimeoutException te) {
LOG.warn("Failed shutdown of clusterSchemaService", te);
final Span span = TraceUtil.createSpan("HMaster exiting main loop");
try (Scope ignored = span.makeCurrent()) {
if (this.clusterSchemaService != null) {
// If on way out, then we are no longer active master.
this.clusterSchemaService.stopAsync();
try {
this.clusterSchemaService
.awaitTerminated(getConfiguration().getInt(HBASE_MASTER_WAIT_ON_SERVICE_IN_SECONDS,
DEFAULT_HBASE_MASTER_WAIT_ON_SERVICE_IN_SECONDS), TimeUnit.SECONDS);
} catch (TimeoutException te) {
LOG.warn("Failed shutdown of clusterSchemaService", te);
}
}
this.activeMaster = false;
span.setStatus(StatusCode.OK);
} finally {
span.end();
}
this.activeMaster = false;
}
}

Expand Down Expand Up @@ -3096,36 +3111,38 @@ public MemoryBoundedLogMessageBuffer getRegionServerFatalLogBuffer() {
* Shutdown the cluster. Master runs a coordinated stop of all RegionServers and then itself.
*/
public void shutdown() throws IOException {
if (cpHost != null) {
cpHost.preShutdown();
}
TraceUtil.trace(() -> {
if (cpHost != null) {
cpHost.preShutdown();
}

// Tell the servermanager cluster shutdown has been called. This makes it so when Master is
// last running server, it'll stop itself. Next, we broadcast the cluster shutdown by setting
// the cluster status as down. RegionServers will notice this change in state and will start
// shutting themselves down. When last has exited, Master can go down.
if (this.serverManager != null) {
this.serverManager.shutdownCluster();
}
if (this.clusterStatusTracker != null) {
try {
this.clusterStatusTracker.setClusterDown();
} catch (KeeperException e) {
LOG.error("ZooKeeper exception trying to set cluster as down in ZK", e);
// Tell the servermanager cluster shutdown has been called. This makes it so when Master is
// last running server, it'll stop itself. Next, we broadcast the cluster shutdown by setting
// the cluster status as down. RegionServers will notice this change in state and will start
// shutting themselves down. When last has exited, Master can go down.
if (this.serverManager != null) {
this.serverManager.shutdownCluster();
}
}
// Stop the procedure executor. Will stop any ongoing assign, unassign, server crash etc.,
// processing so we can go down.
if (this.procedureExecutor != null) {
this.procedureExecutor.stop();
}
// Shutdown our cluster connection. This will kill any hosted RPCs that might be going on;
// this is what we want especially if the Master is in startup phase doing call outs to
// hbase:meta, etc. when cluster is down. Without ths connection close, we'd have to wait on
// the rpc to timeout.
if (this.clusterConnection != null) {
this.clusterConnection.close();
}
if (this.clusterStatusTracker != null) {
try {
this.clusterStatusTracker.setClusterDown();
} catch (KeeperException e) {
LOG.error("ZooKeeper exception trying to set cluster as down in ZK", e);
}
}
// Stop the procedure executor. Will stop any ongoing assign, unassign, server crash etc.,
// processing so we can go down.
if (this.procedureExecutor != null) {
this.procedureExecutor.stop();
}
// Shutdown our cluster connection. This will kill any hosted RPCs that might be going on;
// this is what we want especially if the Master is in startup phase doing call outs to
// hbase:meta, etc. when cluster is down. Without ths connection close, we'd have to wait on
// the rpc to timeout.
if (this.clusterConnection != null) {
this.clusterConnection.close();
}
}, "HMaster.shutdown");
}

public void stopMaster() throws IOException {
Expand Down
Loading