diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/GraphiteSink.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/GraphiteSink.java index ea1bde3a75e03..64eedeaba4aab 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/GraphiteSink.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/sink/GraphiteSink.java @@ -42,166 +42,173 @@ @InterfaceAudience.Public @InterfaceStability.Evolving public class GraphiteSink implements MetricsSink, Closeable { - private static final Logger LOG = - LoggerFactory.getLogger(GraphiteSink.class); - private static final String SERVER_HOST_KEY = "server_host"; - private static final String SERVER_PORT_KEY = "server_port"; - private static final String METRICS_PREFIX = "metrics_prefix"; - private String metricsPrefix = null; - private Graphite graphite = null; - - @Override - public void init(SubsetConfiguration conf) { - // Get Graphite host configurations. - final String serverHost = conf.getString(SERVER_HOST_KEY); - final int serverPort = Integer.parseInt(conf.getString(SERVER_PORT_KEY)); - - // Get Graphite metrics graph prefix. - metricsPrefix = conf.getString(METRICS_PREFIX); - if (metricsPrefix == null) - metricsPrefix = ""; - - graphite = new Graphite(serverHost, serverPort); - graphite.connect(); + private static final Logger LOG = + LoggerFactory.getLogger(GraphiteSink.class); + private static final String SERVER_HOST_KEY = "server_host"; + private static final String SERVER_PORT_KEY = "server_port"; + private static final String METRICS_PREFIX = "metrics_prefix"; + private String metricsPrefix = null; + private Graphite graphite = null; + + @Override + public void init(SubsetConfiguration conf) { + // Get Graphite host configurations. + final String serverHost = conf.getString(SERVER_HOST_KEY); + final int serverPort = Integer.parseInt(conf.getString(SERVER_PORT_KEY)); + + // Get Graphite metrics graph prefix. + metricsPrefix = conf.getString(METRICS_PREFIX); + if (metricsPrefix == null) { + metricsPrefix = ""; + } + graphite = new Graphite(serverHost, serverPort); + graphite.connect(); + } + + @Override + public void putMetrics(MetricsRecord record) { + StringBuilder lines = new StringBuilder(); + StringBuilder metricsPathPrefix = new StringBuilder(); + StringBuilder pointTags = new StringBuilder(""); + + // Configure the hierarchical place to display the graph. + metricsPathPrefix.append(metricsPrefix).append(".") + .append(record.context()).append(".").append(record.name()); + + // collect point tags to be appended at the end of the metric name + for (MetricsTag tag : record.tags()) { + if (tag.value() != null && tag.value().trim().length() > 0) { + pointTags.append(";"); + pointTags.append(tag.name().replace(' ', '_')); + pointTags.append("="); + pointTags.append(tag.value().replace(' ', '_')); + } } - @Override - public void putMetrics(MetricsRecord record) { - StringBuilder lines = new StringBuilder(); - StringBuilder metricsPathPrefix = new StringBuilder(); - - // Configure the hierarchical place to display the graph. - metricsPathPrefix.append(metricsPrefix).append(".") - .append(record.context()).append(".").append(record.name()); - - for (MetricsTag tag : record.tags()) { - if (tag.value() != null) { - metricsPathPrefix.append(".") - .append(tag.name()) - .append("=") - .append(tag.value()); - } - } - - // The record timestamp is in milliseconds while Graphite expects an epoc time in seconds. - long timestamp = record.timestamp() / 1000L; - - // Collect datapoints. - for (AbstractMetric metric : record.metrics()) { - lines.append( - metricsPathPrefix.toString() + "." - + metric.name().replace(' ', '.')).append(" ") - .append(metric.value()).append(" ").append(timestamp) - .append("\n"); - } - - try { - graphite.write(lines.toString()); - } catch (Exception e) { - LOG.warn("Error sending metrics to Graphite", e); - try { - graphite.close(); - } catch (Exception e1) { - throw new MetricsException("Error closing connection to Graphite", e1); - } - } + // The record timestamp is in milliseconds while Graphite expects an epoc + // time in seconds. + long timestamp = record.timestamp() / 1000L; + + // Collect datapoints. + for (AbstractMetric metric : record.metrics()) { + lines.append( + metricsPathPrefix.toString() + "." + + metric.name().replace(' ', '.')).append(pointTags.toString()) + .append(" ") + .append(metric.value()).append(" ").append(timestamp) + .append("\n"); } - @Override - public void flush() { + try { + graphite.write(lines.toString()); + } catch (Exception e) { + LOG.warn("Error sending metrics to Graphite", e); try { - graphite.flush(); - } catch (Exception e) { - LOG.warn("Error flushing metrics to Graphite", e); - try { - graphite.close(); - } catch (Exception e1) { - throw new MetricsException("Error closing connection to Graphite", e1); - } + graphite.close(); + } catch (Exception e1) { + throw new MetricsException("Error closing connection to Graphite", e1); } } - - @Override - public void close() throws IOException { - graphite.close(); + } + + @Override + public void flush() { + try { + graphite.flush(); + } catch (Exception e) { + LOG.warn("Error flushing metrics to Graphite", e); + try { + graphite.close(); + } catch (Exception e1) { + throw new MetricsException("Error closing connection to Graphite", e1); + } + } + } + + @Override + public void close() throws IOException { + graphite.close(); + } + + /** + * internal class for managing connection and writing + * metrics to Graphite server. + */ + public static class Graphite { + private final static int MAX_CONNECTION_FAILURES = 5; + private String serverHost; + private int serverPort; + private Writer writer = null; + private Socket socket = null; + private int connectionFailures = 0; + + public Graphite(String serverHost, int serverPort) { + this.serverHost = serverHost; + this.serverPort = serverPort; } - public static class Graphite { - private final static int MAX_CONNECTION_FAILURES = 5; - - private String serverHost; - private int serverPort; - private Writer writer = null; - private Socket socket = null; - private int connectionFailures = 0; - - public Graphite(String serverHost, int serverPort) { - this.serverHost = serverHost; - this.serverPort = serverPort; + public void connect() { + if (isConnected()) { + throw new MetricsException("Already connected to Graphite"); } - - public void connect() { - if (isConnected()) { - throw new MetricsException("Already connected to Graphite"); - } - if (tooManyConnectionFailures()) { - // return silently (there was ERROR in logs when we reached limit for the first time) - return; - } - try { - // Open a connection to Graphite server. - socket = new Socket(serverHost, serverPort); + if (tooManyConnectionFailures()) { + // return silently (there was ERROR in logs when we reached limit for + // the first time) + return; + } + try { + // Open a connection to Graphite server. + socket = new Socket(serverHost, serverPort); writer = new OutputStreamWriter(socket.getOutputStream(), - StandardCharsets.UTF_8); - } catch (Exception e) { - connectionFailures++; - if (tooManyConnectionFailures()) { - // first time when connection limit reached, report to logs - LOG.error("Too many connection failures, would not try to connect again."); - } - throw new MetricsException("Error creating connection, " - + serverHost + ":" + serverPort, e); + StandardCharsets.UTF_8); + } catch (Exception e) { + connectionFailures++; + if (tooManyConnectionFailures()) { + // first time when connection limit reached, report to logs + LOG.error("Too many connection failures, would not try to " + + "connect again."); } + throw new MetricsException("Error creating connection, " + + serverHost + ":" + serverPort, e); } + } - public void write(String msg) throws IOException { - if (!isConnected()) { - connect(); - } - if (isConnected()) { - writer.write(msg); - } + public void write(String msg) throws IOException { + if (!isConnected()) { + connect(); } - - public void flush() throws IOException { - if (isConnected()) { - writer.flush(); - } + if (isConnected()) { + writer.write(msg); } + } - public boolean isConnected() { - return socket != null && socket.isConnected() && !socket.isClosed(); + public void flush() throws IOException { + if (isConnected()) { + writer.flush(); } + } - public void close() throws IOException { - try { - if (writer != null) { - writer.close(); - } - } catch (IOException ex) { - if (socket != null) { - socket.close(); - } - } finally { - socket = null; - writer = null; - } - } + public boolean isConnected() { + return socket != null && socket.isConnected() && !socket.isClosed(); + } - private boolean tooManyConnectionFailures() { - return connectionFailures > MAX_CONNECTION_FAILURES; + public void close() throws IOException { + try { + if (writer != null) { + writer.close(); + } + } catch (IOException ex) { + if (socket != null) { + socket.close(); + } + } finally { + socket = null; + writer = null; } - } + private boolean tooManyConnectionFailures() { + return connectionFailures > MAX_CONNECTION_FAILURES; + } + } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestGraphiteMetrics.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestGraphiteMetrics.java index 743080acd7a5e..db58217c59fa7 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestGraphiteMetrics.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestGraphiteMetrics.java @@ -44,172 +44,191 @@ public class TestGraphiteMetrics { - private AbstractMetric makeMetric(String name, Number value) { - AbstractMetric metric = mock(AbstractMetric.class); - when(metric.name()).thenReturn(name); - when(metric.value()).thenReturn(value); - return metric; + private AbstractMetric makeMetric(String name, Number value) { + AbstractMetric metric = mock(AbstractMetric.class); + when(metric.name()).thenReturn(name); + when(metric.value()).thenReturn(value); + return metric; + } + + private GraphiteSink.Graphite makeGraphite() { + GraphiteSink.Graphite mockGraphite = mock(GraphiteSink.Graphite.class); + when(mockGraphite.isConnected()).thenReturn(true); + return mockGraphite; + } + + @SuppressWarnings("deprecation") + @Test + public void testPutMetrics() { + GraphiteSink sink = new GraphiteSink(); + List tags = new ArrayList(); + tags.add(new MetricsTag(MsInfo.Context, "all")); + tags.add(new MetricsTag(MsInfo.Hostname, "host")); + Set metrics = new HashSet(); + metrics.add(makeMetric("foo1", 1.25)); + metrics.add(makeMetric("foo2", 2.25)); + MetricsRecord record = new MetricsRecordImpl(MsInfo.Context, + (long) 10000, tags, metrics); + + ArgumentCaptor argument = ArgumentCaptor.forClass(String.class); + final GraphiteSink.Graphite mockGraphite = makeGraphite(); + Whitebox.setInternalState(sink, "graphite", mockGraphite); + sink.putMetrics(record); + + try { + verify(mockGraphite).write(argument.capture()); + } catch (IOException e) { + e.printStackTrace(); } - private GraphiteSink.Graphite makeGraphite() { - GraphiteSink.Graphite mockGraphite = mock(GraphiteSink.Graphite.class); - when(mockGraphite.isConnected()).thenReturn(true); - return mockGraphite; + String result = argument.getValue(); + + assertEquals(true, + result.equals( + "null.all.Context.foo1;Context=all;Hostname=host 1.25 10\n" + + "null.all.Context.foo2;Context=all;Hostname=host 2.25 10\n") || + result.equals( + "null.all.Context.foo2;Context=all;Hostname=host 2.25 10\n" + + "null.all.Context.foo1;Context=all;Hostname=host 1.25 10\n")); + } + + @SuppressWarnings("deprecation") + @Test + public void testPutMetrics2() { + GraphiteSink sink = new GraphiteSink(); + List tags = new ArrayList(); + tags.add(new MetricsTag(MsInfo.Context, "all")); + tags.add(new MetricsTag(MsInfo.Hostname, null)); + Set metrics = new HashSet(); + metrics.add(makeMetric("foo1", 1)); + metrics.add(makeMetric("foo2", 2)); + MetricsRecord record = new MetricsRecordImpl(MsInfo.Context, + (long) 10000, tags, metrics); + + ArgumentCaptor argument = ArgumentCaptor.forClass(String.class); + final GraphiteSink.Graphite mockGraphite = makeGraphite(); + Whitebox.setInternalState(sink, "graphite", mockGraphite); + sink.putMetrics(record); + + try { + verify(mockGraphite).write(argument.capture()); + } catch (IOException e) { + e.printStackTrace(); } - @Test - public void testPutMetrics() { - GraphiteSink sink = new GraphiteSink(); - List tags = new ArrayList(); - tags.add(new MetricsTag(MsInfo.Context, "all")); - tags.add(new MetricsTag(MsInfo.Hostname, "host")); - Set metrics = new HashSet(); - metrics.add(makeMetric("foo1", 1.25)); - metrics.add(makeMetric("foo2", 2.25)); - MetricsRecord record = new MetricsRecordImpl(MsInfo.Context, (long) 10000, tags, metrics); - - ArgumentCaptor argument = ArgumentCaptor.forClass(String.class); - final GraphiteSink.Graphite mockGraphite = makeGraphite(); - Whitebox.setInternalState(sink, "graphite", mockGraphite); - sink.putMetrics(record); - - try { - verify(mockGraphite).write(argument.capture()); - } catch (IOException e) { - e.printStackTrace(); - } - - String result = argument.getValue(); - - assertEquals(true, - result.equals("null.all.Context.Context=all.Hostname=host.foo1 1.25 10\n" + - "null.all.Context.Context=all.Hostname=host.foo2 2.25 10\n") || - result.equals("null.all.Context.Context=all.Hostname=host.foo2 2.25 10\n" + - "null.all.Context.Context=all.Hostname=host.foo1 1.25 10\n")); + String result = argument.getValue(); + + assertEquals(true, + result.equals( + "null.all.Context.foo1;Context=all 1 10\n" + + "null.all.Context.foo2;Context=all 2 10\n") || + result.equals( + "null.all.Context.foo2;Context=all 2 10\n" + + "null.all.Context.foo1;Context=all 1 10\n")); + } + + /** + * Assert that timestamps are converted correctly, ticket HADOOP-11182. + */ + @SuppressWarnings("deprecation") + @Test + public void testPutMetrics3() { + + // setup GraphiteSink + GraphiteSink sink = new GraphiteSink(); + final GraphiteSink.Graphite mockGraphite = makeGraphite(); + Whitebox.setInternalState(sink, "graphite", mockGraphite); + + // given two metrics records with timestamps 1000 milliseconds apart. + List tags = Collections.emptyList(); + Set metrics = new HashSet(); + metrics.add(makeMetric("foo1", 1)); + MetricsRecord record1 = new MetricsRecordImpl(MsInfo.Context, + 1000000000000L, tags, metrics); + MetricsRecord record2 = new MetricsRecordImpl(MsInfo.Context, + 1000000001000L, tags, metrics); + + sink.putMetrics(record1); + sink.putMetrics(record2); + + sink.flush(); + try { + sink.close(); + } catch(IOException e) { + e.printStackTrace(); } - @Test - public void testPutMetrics2() { - GraphiteSink sink = new GraphiteSink(); - List tags = new ArrayList(); - tags.add(new MetricsTag(MsInfo.Context, "all")); - tags.add(new MetricsTag(MsInfo.Hostname, null)); - Set metrics = new HashSet(); - metrics.add(makeMetric("foo1", 1)); - metrics.add(makeMetric("foo2", 2)); - MetricsRecord record = new MetricsRecordImpl(MsInfo.Context, (long) 10000, tags, metrics); - - - ArgumentCaptor argument = ArgumentCaptor.forClass(String.class); - final GraphiteSink.Graphite mockGraphite = makeGraphite(); - Whitebox.setInternalState(sink, "graphite", mockGraphite); - sink.putMetrics(record); - - try { - verify(mockGraphite).write(argument.capture()); - } catch (IOException e) { - e.printStackTrace(); - } - - String result = argument.getValue(); - - assertEquals(true, - result.equals("null.all.Context.Context=all.foo1 1 10\n" + - "null.all.Context.Context=all.foo2 2 10\n") || - result.equals("null.all.Context.Context=all.foo2 2 10\n" + - "null.all.Context.Context=all.foo1 1 10\n")); + // then the timestamps in the graphite stream should differ by one second. + try { + verify(mockGraphite).write(eq( + "null.default.Context.foo1 1 1000000000\n")); + verify(mockGraphite).write(eq( + "null.default.Context.foo1 1 1000000001\n")); + } catch (IOException e) { + e.printStackTrace(); } - - /** - * Assert that timestamps are converted correctly, ticket HADOOP-11182 - */ - @Test - public void testPutMetrics3() { - - // setup GraphiteSink - GraphiteSink sink = new GraphiteSink(); - final GraphiteSink.Graphite mockGraphite = makeGraphite(); - Whitebox.setInternalState(sink, "graphite", mockGraphite); - - // given two metrics records with timestamps 1000 milliseconds apart. - List tags = Collections.emptyList(); - Set metrics = new HashSet(); - metrics.add(makeMetric("foo1", 1)); - MetricsRecord record1 = new MetricsRecordImpl(MsInfo.Context, 1000000000000L, tags, metrics); - MetricsRecord record2 = new MetricsRecordImpl(MsInfo.Context, 1000000001000L, tags, metrics); - - sink.putMetrics(record1); - sink.putMetrics(record2); - - sink.flush(); - try { - sink.close(); - } catch(IOException e) { - e.printStackTrace(); - } - - // then the timestamps in the graphite stream should differ by one second. - try { - verify(mockGraphite).write(eq("null.default.Context.foo1 1 1000000000\n")); - verify(mockGraphite).write(eq("null.default.Context.foo1 1 1000000001\n")); - } catch (IOException e) { - e.printStackTrace(); - } + } + + @SuppressWarnings("deprecation") + @Test + public void testFailureAndPutMetrics() throws IOException { + GraphiteSink sink = new GraphiteSink(); + List tags = new ArrayList(); + tags.add(new MetricsTag(MsInfo.Context, "all")); + tags.add(new MetricsTag(MsInfo.Hostname, "host")); + Set metrics = new HashSet(); + metrics.add(makeMetric("foo1", 1.25)); + metrics.add(makeMetric("foo2", 2.25)); + MetricsRecord record = new MetricsRecordImpl(MsInfo.Context, + (long) 10000, tags, metrics); + + final GraphiteSink.Graphite mockGraphite = makeGraphite(); + Whitebox.setInternalState(sink, "graphite", mockGraphite); + + // throw exception when first try + doThrow(new IOException("IO exception")) + .when(mockGraphite) + .write(anyString()); + + sink.putMetrics(record); + verify(mockGraphite).write(anyString()); + verify(mockGraphite).close(); + + // reset mock and try again + reset(mockGraphite); + when(mockGraphite.isConnected()).thenReturn(false); + + ArgumentCaptor argument = ArgumentCaptor.forClass(String.class); + sink.putMetrics(record); + + verify(mockGraphite).write(argument.capture()); + String result = argument.getValue(); + + assertEquals(true, + result.equals( + "null.all.Context.foo1;Context=all;Hostname=host 1.25 10\n" + + "null.all.Context.foo2;Context=all;Hostname=host 2.25 10\n") || + result.equals( + "null.all.Context.foo2;Context=all;Hostname=host 2.25 10\n" + + "null.all.Context.foo1;Context=all;Hostname=host 1.25 10\n")); + } + + @SuppressWarnings("deprecation") + @Test + public void testClose(){ + GraphiteSink sink = new GraphiteSink(); + final GraphiteSink.Graphite mockGraphite = makeGraphite(); + Whitebox.setInternalState(sink, "graphite", mockGraphite); + try { + sink.close(); + } catch (IOException ioe) { + ioe.printStackTrace(); } - @Test - public void testFailureAndPutMetrics() throws IOException { - GraphiteSink sink = new GraphiteSink(); - List tags = new ArrayList(); - tags.add(new MetricsTag(MsInfo.Context, "all")); - tags.add(new MetricsTag(MsInfo.Hostname, "host")); - Set metrics = new HashSet(); - metrics.add(makeMetric("foo1", 1.25)); - metrics.add(makeMetric("foo2", 2.25)); - MetricsRecord record = new MetricsRecordImpl(MsInfo.Context, (long) 10000, tags, metrics); - - final GraphiteSink.Graphite mockGraphite = makeGraphite(); - Whitebox.setInternalState(sink, "graphite", mockGraphite); - - // throw exception when first try - doThrow(new IOException("IO exception")).when(mockGraphite).write(anyString()); - - sink.putMetrics(record); - verify(mockGraphite).write(anyString()); + try { verify(mockGraphite).close(); - - // reset mock and try again - reset(mockGraphite); - when(mockGraphite.isConnected()).thenReturn(false); - - ArgumentCaptor argument = ArgumentCaptor.forClass(String.class); - sink.putMetrics(record); - - verify(mockGraphite).write(argument.capture()); - String result = argument.getValue(); - - assertEquals(true, - result.equals("null.all.Context.Context=all.Hostname=host.foo1 1.25 10\n" + - "null.all.Context.Context=all.Hostname=host.foo2 2.25 10\n") || - result.equals("null.all.Context.Context=all.Hostname=host.foo2 2.25 10\n" + - "null.all.Context.Context=all.Hostname=host.foo1 1.25 10\n")); - } - - @Test - public void testClose(){ - GraphiteSink sink = new GraphiteSink(); - final GraphiteSink.Graphite mockGraphite = makeGraphite(); - Whitebox.setInternalState(sink, "graphite", mockGraphite); - try { - sink.close(); - } catch (IOException ioe) { - ioe.printStackTrace(); - } - - try { - verify(mockGraphite).close(); - } catch (IOException ioe) { - ioe.printStackTrace(); - } + } catch (IOException ioe) { + ioe.printStackTrace(); } + } }