Skip to content

Commit f2ed94e

Browse files
authored
CAMEL-16866 add message history metric (#19984)
1 parent 8992706 commit f2ed94e

File tree

9 files changed

+748
-1
lines changed

9 files changed

+748
-1
lines changed

components/camel-opentelemetry-metrics/src/main/docs/opentelemetry-metrics-component.adoc

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -378,7 +378,6 @@ in case you only want to instrument a few selected routes.
378378

379379
Camel specific metrics that are available via the `OpenTelemetryRoutePolicy`:
380380
[width="100%",options="header"]
381-
[width="100%",options="header"]
382381
|=====================================================
383382
|Default Name |Type |Description
384383
| camel.route.policy | LongHistogram | Time taken to complete processing of exchanges for a route
@@ -402,6 +401,36 @@ The following options are supported:
402401
| longTaskTimeUnit | TimeUnit.MILLISECONDS | The time unit for the 'long task' timer that measures total duration of all active tasks.
403402
|=======================================================================
404403

404+
=== OpenTelemetry Message History Factory
405+
406+
`OpenTelemetryMessageHistoryFactory` allows the capture of Message History performance statistics while routing messages.
407+
It works by using an OpenTelemetry Timer to record the time taken to process each node in all routes.
408+
409+
The `OpenTelemetryMessageHistoryFactory` can be added to the `CamelContext`:
410+
411+
[source,java]
412+
----
413+
context.setMessageHistoryFactory(new OpenTelemetryMessageHistoryFactory());
414+
----
415+
416+
`OpenTelemetryMessageHistory` provides the following Camel specific metric:
417+
418+
[width="100%",options="header"]
419+
|=====================================================
420+
|Default Name |Type |Description
421+
|camel.message.history | LongHistogram | Time taken for a node to process the message
422+
|=====================================================
423+
424+
The following options are supported on the `OpenTelemetryMessageHistoryFactory`:
425+
426+
[width="100%",options="header"]
427+
|=======================================================================
428+
|Name |Default |Description
429+
|namingStrategy | OpenTelemetryHistoryNamingStrategy.DEFAULT | The strategy to use for overriding the default metric name.
430+
|nodePattern | | A pattern to filter which nodes to capture metrics for. By default, all nodes are included
431+
|durationUnit | TimeUnit.MILLISECONDS | The time unit for recording the processing time.
432+
|=======================================================================
433+
405434
== OpenTelemetry Configuration
406435

407436
Applications can export collected metrics to various backends using different exporters, including the OpenTelemetry Protocol (OTLP) exporter,

components/camel-opentelemetry-metrics/src/main/java/org/apache/camel/opentelemetry/metrics/OpenTelemetryConstants.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,9 +62,13 @@ public class OpenTelemetryConstants {
6262
public static final String DEFAULT_CAMEL_ROUTES_RUNNING = "camel.routes.running";
6363
public static final String DEFAULT_CAMEL_ROUTES_RELOADED = "camel.routes.reloaded";
6464

65+
// Message-history metric
66+
public static final String DEFAULT_CAMEL_MESSAGE_HISTORY_METER_NAME = "camel.message.history";
67+
6568
// OpenTelemetry Attribute keys
6669
public static final String CAMEL_CONTEXT_ATTRIBUTE = "camelContext";
6770
public static final String ROUTE_ID_ATTRIBUTE = "routeId";
71+
public static final String NODE_ID_ATTRIBUTE = "nodeId";
6872
public static final String FAILED_ATTRIBUTE = "failed";
6973
public static final String EVENT_TYPE_ATTRIBUTE = "eventType";
7074
public static final String KIND_ATTRIBUTE = "kind";
@@ -73,6 +77,7 @@ public class OpenTelemetryConstants {
7377
// OpenTelemetry Attribute values
7478
public static final String KIND_EXCHANGE = "CamelExchangeEvent";
7579
public static final String KIND_ROUTE = "CamelRoute";
80+
public static final String KIND_HISTORY = "CamelMessageHistory";
7681

7782
private OpenTelemetryConstants() {
7883
// no-op
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.camel.opentelemetry.metrics.messagehistory;
18+
19+
import io.opentelemetry.api.common.AttributeKey;
20+
import io.opentelemetry.api.common.Attributes;
21+
import org.apache.camel.NamedNode;
22+
import org.apache.camel.Route;
23+
24+
import static org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.CAMEL_CONTEXT_ATTRIBUTE;
25+
import static org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.DEFAULT_CAMEL_MESSAGE_HISTORY_METER_NAME;
26+
import static org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.KIND_ATTRIBUTE;
27+
import static org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.KIND_HISTORY;
28+
import static org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.NODE_ID_ATTRIBUTE;
29+
import static org.apache.camel.opentelemetry.metrics.OpenTelemetryConstants.ROUTE_ID_ATTRIBUTE;
30+
31+
public interface OpenTelemetryHistoryNamingStrategy {
32+
33+
OpenTelemetryHistoryNamingStrategy DEFAULT = () -> DEFAULT_CAMEL_MESSAGE_HISTORY_METER_NAME;
34+
35+
String getName();
36+
37+
default String formatName(String name) {
38+
return name;
39+
}
40+
41+
default Attributes getAttributes(Route route, NamedNode node) {
42+
return Attributes.of(
43+
AttributeKey.stringKey(CAMEL_CONTEXT_ATTRIBUTE), route.getCamelContext().getName(),
44+
AttributeKey.stringKey(KIND_ATTRIBUTE), KIND_HISTORY,
45+
AttributeKey.stringKey(ROUTE_ID_ATTRIBUTE), route.getId(),
46+
AttributeKey.stringKey(NODE_ID_ATTRIBUTE), node.getId());
47+
}
48+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.camel.opentelemetry.metrics.messagehistory;
18+
19+
import java.util.concurrent.TimeUnit;
20+
21+
import io.opentelemetry.api.metrics.LongHistogram;
22+
import org.apache.camel.Message;
23+
import org.apache.camel.NamedNode;
24+
import org.apache.camel.Route;
25+
import org.apache.camel.support.DefaultMessageHistory;
26+
27+
public class OpenTelemetryMessageHistory extends DefaultMessageHistory {
28+
29+
private final Route route;
30+
private final OpenTelemetryHistoryNamingStrategy namingStrategy;
31+
private final TimeUnit durationUnit;
32+
private final LongHistogram timer;
33+
34+
public OpenTelemetryMessageHistory(LongHistogram timer, TimeUnit durationUnit, Route route, NamedNode namedNode,
35+
OpenTelemetryHistoryNamingStrategy namingStrategy, Message message) {
36+
super(route.getId(), namedNode, message);
37+
this.timer = timer;
38+
this.durationUnit = durationUnit;
39+
this.route = route;
40+
this.namingStrategy = namingStrategy;
41+
}
42+
43+
@Override
44+
public void nodeProcessingDone() {
45+
super.nodeProcessingDone();
46+
this.timer.record(durationUnit.convert(getElapsed(), TimeUnit.MILLISECONDS),
47+
namingStrategy.getAttributes(route, getNode()));
48+
}
49+
50+
@Override
51+
public String toString() {
52+
return "OpenTelemetryMessageHistory[routeId=" + getRouteId() + ", node=" + getNode().getId() + ']';
53+
}
54+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.camel.opentelemetry.metrics.messagehistory;
18+
19+
import java.util.concurrent.TimeUnit;
20+
21+
import io.opentelemetry.api.GlobalOpenTelemetry;
22+
import io.opentelemetry.api.metrics.LongHistogram;
23+
import io.opentelemetry.api.metrics.Meter;
24+
import org.apache.camel.CamelContext;
25+
import org.apache.camel.CamelContextAware;
26+
import org.apache.camel.Exchange;
27+
import org.apache.camel.Message;
28+
import org.apache.camel.MessageHistory;
29+
import org.apache.camel.NamedNode;
30+
import org.apache.camel.NonManagedService;
31+
import org.apache.camel.Route;
32+
import org.apache.camel.RuntimeCamelException;
33+
import org.apache.camel.StaticService;
34+
import org.apache.camel.spi.MessageHistoryFactory;
35+
import org.apache.camel.support.CamelContextHelper;
36+
import org.apache.camel.support.PatternHelper;
37+
import org.apache.camel.support.service.ServiceSupport;
38+
39+
public class OpenTelemetryMessageHistoryFactory extends ServiceSupport
40+
implements CamelContextAware, StaticService, NonManagedService, MessageHistoryFactory {
41+
42+
private CamelContext camelContext;
43+
private Meter meter;
44+
private boolean copyMessage;
45+
private String nodePattern;
46+
private TimeUnit durationUnit = TimeUnit.MILLISECONDS;
47+
private OpenTelemetryHistoryNamingStrategy namingStrategy = OpenTelemetryHistoryNamingStrategy.DEFAULT;
48+
private LongHistogram timer;
49+
50+
@Override
51+
public CamelContext getCamelContext() {
52+
return camelContext;
53+
}
54+
55+
@Override
56+
public void setCamelContext(CamelContext camelContext) {
57+
this.camelContext = camelContext;
58+
}
59+
60+
public Meter getMeter() {
61+
return meter;
62+
}
63+
64+
/**
65+
* Sets the OpenTelemetry meter to use for recording metrics.
66+
*/
67+
public void setMeter(Meter meter) {
68+
this.meter = meter;
69+
}
70+
71+
public TimeUnit getDurationUnit() {
72+
return durationUnit;
73+
}
74+
75+
/**
76+
* Sets the time unit to use for timing the duration of processing a message in the route.
77+
*/
78+
public void setDurationUnit(TimeUnit durationUnit) {
79+
this.durationUnit = durationUnit;
80+
}
81+
82+
public OpenTelemetryHistoryNamingStrategy getNamingStrategy() {
83+
return namingStrategy;
84+
}
85+
86+
/**
87+
* Sets the naming strategy for message history meter names.
88+
*/
89+
public void setNamingStrategy(OpenTelemetryHistoryNamingStrategy namingStrategy) {
90+
this.namingStrategy = namingStrategy;
91+
}
92+
93+
@Override
94+
public boolean isCopyMessage() {
95+
return copyMessage;
96+
}
97+
98+
@Override
99+
public void setCopyMessage(boolean copyMessage) {
100+
this.copyMessage = copyMessage;
101+
}
102+
103+
@Override
104+
public String getNodePattern() {
105+
return nodePattern;
106+
}
107+
108+
@Override
109+
public void setNodePattern(String nodePattern) {
110+
this.nodePattern = nodePattern;
111+
}
112+
113+
@Override
114+
public MessageHistory newMessageHistory(String routeId, NamedNode namedNode, Exchange exchange) {
115+
if (nodePattern != null) {
116+
String name = namedNode.getShortName();
117+
String[] parts = nodePattern.split(",");
118+
boolean match = false;
119+
for (String part : parts) {
120+
if (PatternHelper.matchPattern(name, part)) {
121+
match = true;
122+
break;
123+
}
124+
}
125+
// no match on any part
126+
if (!match) {
127+
return null;
128+
}
129+
}
130+
131+
Message msg = null;
132+
if (copyMessage) {
133+
msg = exchange.getMessage().copy();
134+
}
135+
136+
Route route = camelContext.getRoute(routeId);
137+
if (route != null) {
138+
return new OpenTelemetryMessageHistory(timer, getDurationUnit(), route, namedNode, getNamingStrategy(), msg);
139+
} else {
140+
return null;
141+
}
142+
}
143+
144+
@Override
145+
protected void doInit() throws Exception {
146+
super.doInit();
147+
if (meter == null) {
148+
this.meter = CamelContextHelper.findSingleByType(getCamelContext(), Meter.class);
149+
}
150+
if (meter == null) {
151+
this.meter = GlobalOpenTelemetry.get().getMeter("camel");
152+
}
153+
if (meter == null) {
154+
throw new RuntimeCamelException("Could not find any OpenTelemetry meter!");
155+
}
156+
}
157+
158+
@Override
159+
protected void doStart() throws Exception {
160+
super.doStart();
161+
this.timer = meter
162+
.histogramBuilder(getNamingStrategy().getName())
163+
.ofLongs()
164+
.setDescription("Node performance metrics")
165+
.setUnit(durationUnit.name().toLowerCase())
166+
.build();
167+
}
168+
}

0 commit comments

Comments
 (0)