11package datadog .trace .common .metrics ;
22
33import static datadog .communication .ddagent .DDAgentFeaturesDiscovery .V6_METRICS_ENDPOINT ;
4+ import static datadog .trace .api .DDTags .BASE_SERVICE ;
45import static datadog .trace .api .Functions .UTF8_ENCODE ;
56import static datadog .trace .bootstrap .instrumentation .api .Tags .SPAN_KIND ;
7+ import static datadog .trace .bootstrap .instrumentation .api .Tags .SPAN_KIND_CLIENT ;
8+ import static datadog .trace .bootstrap .instrumentation .api .Tags .SPAN_KIND_CONSUMER ;
9+ import static datadog .trace .bootstrap .instrumentation .api .Tags .SPAN_KIND_INTERNAL ;
10+ import static datadog .trace .bootstrap .instrumentation .api .Tags .SPAN_KIND_PRODUCER ;
11+ import static datadog .trace .bootstrap .instrumentation .api .Tags .SPAN_KIND_SERVER ;
612import static datadog .trace .common .metrics .AggregateMetric .ERROR_TAG ;
713import static datadog .trace .common .metrics .AggregateMetric .TOP_LEVEL_TAG ;
814import static datadog .trace .common .metrics .SignalItem .ReportSignal .REPORT ;
915import static datadog .trace .common .metrics .SignalItem .StopSignal .STOP ;
1016import static datadog .trace .util .AgentThreadFactory .AgentThread .METRICS_AGGREGATOR ;
1117import static datadog .trace .util .AgentThreadFactory .THREAD_JOIN_TIMOUT_MS ;
1218import static datadog .trace .util .AgentThreadFactory .newAgentThread ;
19+ import static java .util .Collections .unmodifiableSet ;
1320import static java .util .concurrent .TimeUnit .SECONDS ;
1421
1522import datadog .communication .ddagent .DDAgentFeaturesDiscovery ;
1623import datadog .communication .ddagent .SharedCommunicationObjects ;
1724import datadog .trace .api .Config ;
25+ import datadog .trace .api .Pair ;
1826import datadog .trace .api .WellKnownTags ;
1927import datadog .trace .api .cache .DDCache ;
2028import datadog .trace .api .cache .DDCaches ;
2533import datadog .trace .core .DDTraceCoreInfo ;
2634import datadog .trace .core .monitor .HealthMetrics ;
2735import datadog .trace .util .AgentTaskScheduler ;
36+ import java .util .ArrayList ;
37+ import java .util .Arrays ;
2838import java .util .Collections ;
39+ import java .util .HashSet ;
2940import java .util .List ;
3041import java .util .Map ;
3142import java .util .Queue ;
3243import java .util .Set ;
3344import java .util .concurrent .CompletableFuture ;
3445import java .util .concurrent .Future ;
3546import java .util .concurrent .TimeUnit ;
47+ import java .util .function .Function ;
3648import org .jctools .maps .NonBlockingHashMap ;
3749import org .jctools .queues .MpscCompoundQueue ;
3850import org .jctools .queues .SpmcArrayQueue ;
@@ -49,8 +61,32 @@ public final class ConflatingMetricsAggregator implements MetricsAggregator, Eve
4961 private static final DDCache <String , UTF8BytesString > SERVICE_NAMES =
5062 DDCaches .newFixedSizeCache (32 );
5163
64+ private static final DDCache <CharSequence , UTF8BytesString > SPAN_KINDS =
65+ DDCaches .newFixedSizeCache (16 );
66+ private static final DDCache <
67+ String , Pair <DDCache <String , UTF8BytesString >, Function <String , UTF8BytesString >>>
68+ PEER_TAGS_CACHE =
69+ DDCaches .newFixedSizeCache (
70+ 64 ); // it can be unbounded since those values are returned by the agent and should be
71+ // under control. 64 entries is enough in this case to contain all the peer tags.
72+ private static final Function <
73+ String , Pair <DDCache <String , UTF8BytesString >, Function <String , UTF8BytesString >>>
74+ PEER_TAGS_CACHE_ADDER =
75+ key ->
76+ Pair .of (
77+ DDCaches .newFixedSizeCache (512 ),
78+ value -> UTF8BytesString .create (key + ":" + value ));
5279 private static final CharSequence SYNTHETICS_ORIGIN = "synthetics" ;
5380
81+ private static final Set <String > ELIGIBLE_SPAN_KINDS_FOR_METRICS =
82+ unmodifiableSet (
83+ new HashSet <>(
84+ Arrays .asList (
85+ SPAN_KIND_SERVER , SPAN_KIND_CLIENT , SPAN_KIND_CONSUMER , SPAN_KIND_PRODUCER )));
86+
87+ private static final Set <String > ELIGIBLE_SPAN_KINDS_FOR_PEER_AGGREGATION =
88+ unmodifiableSet (new HashSet <>(Arrays .asList (SPAN_KIND_CLIENT , SPAN_KIND_PRODUCER )));
89+
5490 private final Set <String > ignoredResources ;
5591 private final Queue <Batch > batchPool ;
5692 private final NonBlockingHashMap <MetricKey , Batch > pending ;
@@ -262,18 +298,23 @@ private boolean shouldComputeMetric(CoreSpan<?> span) {
262298 private boolean spanKindEligible (CoreSpan <?> span ) {
263299 final Object spanKind = span .getTag (SPAN_KIND );
264300 // use toString since it could be a CharSequence...
265- return spanKind != null && features . spanKindsToComputedStats () .contains (spanKind .toString ());
301+ return spanKind != null && ELIGIBLE_SPAN_KINDS_FOR_METRICS .contains (spanKind .toString ());
266302 }
267303
268304 private boolean publish (CoreSpan <?> span , boolean isTopLevel ) {
305+ final CharSequence spanKind = span .getTag (SPAN_KIND , "" );
269306 MetricKey newKey =
270307 new MetricKey (
271308 span .getResourceName (),
272309 SERVICE_NAMES .computeIfAbsent (span .getServiceName (), UTF8_ENCODE ),
273310 span .getOperationName (),
274311 span .getType (),
275312 span .getHttpStatusCode (),
276- isSynthetic (span ));
313+ isSynthetic (span ),
314+ span .getParentId () == 0 ,
315+ SPAN_KINDS .computeIfAbsent (
316+ spanKind , UTF8BytesString ::create ), // save repeated utf8 conversions
317+ getPeerTags (span , spanKind .toString ()));
277318 boolean isNewKey = false ;
278319 MetricKey key = keys .putIfAbsent (newKey , newKey );
279320 if (null == key ) {
@@ -288,7 +329,7 @@ private boolean publish(CoreSpan<?> span, boolean isTopLevel) {
288329 // returning false means that either the batch can't take any
289330 // more data, or it has already been consumed
290331 if (batch .add (tag , durationNanos )) {
291- // added to a pending batch prior to consumption
332+ // added to a pending batch prior to consumption,
292333 // so skip publishing to the queue (we also know
293334 // the key isn't rare enough to override the sampler)
294335 return false ;
@@ -308,6 +349,34 @@ private boolean publish(CoreSpan<?> span, boolean isTopLevel) {
308349 return isNewKey || span .getError () > 0 ;
309350 }
310351
352+ private List <UTF8BytesString > getPeerTags (CoreSpan <?> span , String spanKind ) {
353+ if (ELIGIBLE_SPAN_KINDS_FOR_PEER_AGGREGATION .contains (spanKind )) {
354+ List <UTF8BytesString > peerTags = new ArrayList <>();
355+ for (String peerTag : features .peerTags ()) {
356+ Object value = span .getTag (peerTag );
357+ if (value != null ) {
358+ final Pair <DDCache <String , UTF8BytesString >, Function <String , UTF8BytesString >>
359+ cacheAndCreator = PEER_TAGS_CACHE .computeIfAbsent (peerTag , PEER_TAGS_CACHE_ADDER );
360+ peerTags .add (
361+ cacheAndCreator
362+ .getLeft ()
363+ .computeIfAbsent (value .toString (), cacheAndCreator .getRight ()));
364+ }
365+ }
366+ return peerTags ;
367+ } else if (SPAN_KIND_INTERNAL .equals (spanKind )) {
368+ // in this case only the base service should be aggregated if present
369+ final String baseService = span .getTag (BASE_SERVICE );
370+ if (baseService != null ) {
371+ final Pair <DDCache <String , UTF8BytesString >, Function <String , UTF8BytesString >>
372+ cacheAndCreator = PEER_TAGS_CACHE .computeIfAbsent (BASE_SERVICE , PEER_TAGS_CACHE_ADDER );
373+ return Collections .singletonList (
374+ cacheAndCreator .getLeft ().computeIfAbsent (baseService , cacheAndCreator .getRight ()));
375+ }
376+ }
377+ return Collections .emptyList ();
378+ }
379+
311380 private static boolean isSynthetic (CoreSpan <?> span ) {
312381 return span .getOrigin () != null && SYNTHETICS_ORIGIN .equals (span .getOrigin ().toString ());
313382 }
0 commit comments