Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.protocolPB;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.MetricsInfo;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.MetricsSource;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;

import com.google.protobuf.ProtocolMessageEnum;

/**
* Metrics to count all the subtypes of a specific message.
*/
public class ProtocolMessageMetrics implements MetricsSource {

private String name;

private String description;

private Map<ProtocolMessageEnum, AtomicLong> counters =
new ConcurrentHashMap<>();

public static ProtocolMessageMetrics create(String name,
String description, ProtocolMessageEnum[] types) {
ProtocolMessageMetrics protocolMessageMetrics =
new ProtocolMessageMetrics(name, description,
types);
return protocolMessageMetrics;
}

public ProtocolMessageMetrics(String name, String description,
ProtocolMessageEnum[] values) {
this.name = name;
this.description = description;
for (ProtocolMessageEnum value : values) {
counters.put(value, new AtomicLong(0));
}
}

public void increment(ProtocolMessageEnum key) {
counters.get(key).incrementAndGet();
}

public void register() {
DefaultMetricsSystem.instance()
.register(name, description, this);
}

public void unregister() {
DefaultMetricsSystem.instance().unregisterSource(name);
}

@Override
public void getMetrics(MetricsCollector collector, boolean all) {
MetricsRecordBuilder builder = collector.addRecord(name);
counters.forEach((key, value) -> {
builder.addCounter(new MetricName(key.toString(), ""), value.longValue());
});
builder.endRecord();
}

/**
* Simple metrics info implementation.
*/
public static class MetricName implements MetricsInfo {
private String name;
private String description;

public MetricName(String name, String description) {
this.name = name;
this.description = description;
}

@Override
public String name() {
return name;
}

@Override
public String description() {
return description;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,25 @@
*/
package org.apache.hadoop.ozone.protocolPB;

import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import io.opentracing.Scope;
import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
.AllocateBlockResponse;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.AllocateBlockResponse;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.AllocateScmBlockRequestProto;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.AllocateScmBlockResponseProto;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.DeleteKeyBlocksResultProto;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.DeleteScmKeyBlocksRequestProto;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.DeleteScmKeyBlocksResponseProto;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.SCMBlockLocationRequest;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.SCMBlockLocationResponse;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.SortDatanodesRequestProto;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.SortDatanodesResponseProto;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.Status;
import org.apache.hadoop.hdds.scm.ScmInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
Expand All @@ -34,34 +44,15 @@
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
.AllocateScmBlockRequestProto;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
.AllocateScmBlockResponseProto;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
.DeleteKeyBlocksResultProto;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
.DeleteScmKeyBlocksRequestProto;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
.DeleteScmKeyBlocksResponseProto;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
.SCMBlockLocationResponse;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
.SCMBlockLocationRequest;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
.Status;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
.SortDatanodesRequestProto;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
.SortDatanodesResponseProto;
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.ozone.common.BlockGroup;
import org.apache.hadoop.ozone.common.DeleteBlockGroupResult;

import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import io.opentracing.Scope;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* This class is the server-side translator that forwards requests received on
Expand All @@ -74,14 +65,22 @@ public final class ScmBlockLocationProtocolServerSideTranslatorPB

private final ScmBlockLocationProtocol impl;

private static final Logger LOG = LoggerFactory
.getLogger(ScmBlockLocationProtocolServerSideTranslatorPB.class);

private final ProtocolMessageMetrics
protocolMessageMetrics;

/**
* Creates a new ScmBlockLocationProtocolServerSideTranslatorPB.
*
* @param impl {@link ScmBlockLocationProtocol} server implementation
*/
public ScmBlockLocationProtocolServerSideTranslatorPB(
ScmBlockLocationProtocol impl) throws IOException {
ScmBlockLocationProtocol impl,
ProtocolMessageMetrics metrics) throws IOException {
this.impl = impl;
this.protocolMessageMetrics = metrics;
}

private SCMBlockLocationResponse.Builder createSCMBlockResponse(
Expand All @@ -97,15 +96,45 @@ public SCMBlockLocationResponse send(RpcController controller,
SCMBlockLocationRequest request) throws ServiceException {
String traceId = request.getTraceID();

if (LOG.isTraceEnabled()) {
LOG.trace("BlockLocationProtocol {} request is received: <json>{}</json>",
request.getCmdType().toString(),
request.toString().replaceAll("\n", "\\\\n"));

} else if (LOG.isDebugEnabled()) {
LOG.debug("BlockLocationProtocol {} request is received",
request.getCmdType().toString());
}

protocolMessageMetrics.increment(request.getCmdType());

try (Scope scope = TracingUtil
.importAndCreateScope(
"ScmBlockLocationProtocol." + request.getCmdType(),
request.getTraceID())) {
SCMBlockLocationResponse response =
processMessage(request, traceId);

if (LOG.isTraceEnabled()) {
LOG.trace(
"BlockLocationProtocol {} request is processed. Response: "
+ "<json>{}</json>",
request.getCmdType().toString(),
response.toString().replaceAll("\n", "\\\\n"));
}
return response;
}
}

private SCMBlockLocationResponse processMessage(
SCMBlockLocationRequest request, String traceId) throws ServiceException {
SCMBlockLocationResponse.Builder response = createSCMBlockResponse(
request.getCmdType(),
traceId);
response.setSuccess(true);
response.setStatus(Status.OK);

try(Scope scope = TracingUtil
.importAndCreateScope("ScmBlockLocationProtocol."+request.getCmdType(),
request.getTraceID())) {
try {
switch (request.getCmdType()) {
case AllocateScmBlock:
response.setAllocateScmBlockResponse(
Expand All @@ -125,7 +154,7 @@ public SCMBlockLocationResponse send(RpcController controller,
break;
default:
// Should never happen
throw new IOException("Unknown Operation "+request.getCmdType()+
throw new IOException("Unknown Operation " + request.getCmdType() +
" in ScmBlockLocationProtocol");
}
} catch (IOException e) {
Expand All @@ -135,6 +164,7 @@ public SCMBlockLocationResponse send(RpcController controller,
response.setMessage(e.getMessage());
}
}

return response.build();
}

Expand Down Expand Up @@ -182,12 +212,12 @@ public DeleteScmKeyBlocksResponseProto deleteScmKeyBlocks(
.map(BlockGroup::getFromProto).collect(Collectors.toList());
final List<DeleteBlockGroupResult> results =
impl.deleteKeyBlocks(infoList);
for (DeleteBlockGroupResult result: results) {
for (DeleteBlockGroupResult result : results) {
DeleteKeyBlocksResultProto.Builder deleteResult =
DeleteKeyBlocksResultProto
.newBuilder()
.setObjectKey(result.getObjectKey())
.addAllBlockResults(result.getBlockResultProtoList());
.newBuilder()
.setObjectKey(result.getObjectKey())
.addAllBlockResults(result.getBlockResultProtoList());
resp.addResults(deleteResult.build());
}
return resp.build();
Expand Down
2 changes: 1 addition & 1 deletion hadoop-hdds/common/src/main/resources/ozone-default.xml
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,7 @@

<property>
<name>hdds.prometheus.endpoint.enabled</name>
<value>false</value>
<value>true</value>
<tag>OZONE, MANAGEMENT</tag>
<description>Enable prometheus compatible metric page on the HTTP
servers.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,21 +93,21 @@ public boolean process(Set<? extends TypeElement> annotations,
}

}
FileObject resource = filer
.createResource(StandardLocation.CLASS_OUTPUT, "",
OUTPUT_FILE_NAME);
}
FileObject resource = filer
.createResource(StandardLocation.CLASS_OUTPUT, "",
OUTPUT_FILE_NAME);

try (Writer writer = new OutputStreamWriter(
resource.openOutputStream(), StandardCharsets.UTF_8)) {
appender.write(writer);
}
try (Writer writer = new OutputStreamWriter(
resource.openOutputStream(), StandardCharsets.UTF_8)) {
appender.write(writer);
}

} catch (IOException e) {
processingEnv.getMessager().printMessage(Kind.ERROR,
"Can't generate the config file from annotation: " + e.getMessage());
}
return false;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.hadoop.http.HttpServer2;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.net.NetUtils;

import org.eclipse.jetty.webapp.WebAppContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -92,8 +93,9 @@ public BaseHttpServer(Configuration conf, String name) throws IOException {
httpServer = builder.build();
httpServer.addServlet("conf", "/conf", HddsConfServlet.class);

httpServer.addServlet("logstream", "/logstream", LogStreamServlet.class);
prometheusSupport =
conf.getBoolean(HddsConfigKeys.HDDS_PROMETHEUS_ENABLED, false);
conf.getBoolean(HddsConfigKeys.HDDS_PROMETHEUS_ENABLED, true);

profilerSupport =
conf.getBoolean(HddsConfigKeys.HDDS_PROFILER_ENABLED, false);
Expand Down
Loading