From f772a4cde5db685e500db782ce49e9bb814127a6 Mon Sep 17 00:00:00 2001 From: Igor Kravchenko Date: Wed, 9 Jul 2025 11:59:45 -0500 Subject: [PATCH 1/3] Avoid long to string conversion --- .../core/datastreams/DefaultPathwayContext.java | 17 ++++++++++++++++- 1 file changed, 16 insertions(+), 1 deletion(-) diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java index a1858eab9cd..dd0ed1c3000 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java @@ -168,7 +168,7 @@ public void setCheckpoint(DataStreamsContext context, Consumer point } long newHash = generatePathwayHash(nodeHash, hash); - long aggregationHash = aggregationHashBuilder.addValue(String.valueOf(newHash)); + long aggregationHash = aggregationHashBuilder.addValue(newHash); long pathwayLatencyNano = nanoTicks - pathwayStartNanoTicks; long edgeLatencyNano = nanoTicks - edgeStartNanoTicks; @@ -334,6 +334,21 @@ public long addValue(String val) { currentHash = FNV64Hash.generateHash(currentHash + val, FNV64Hash.Version.v1); return currentHash; } + + public long addValue(long val) { + byte[] b = new byte[] { + (byte) val, + (byte) (val >> 8), + (byte) (val >> 16), + (byte) (val >> 24), + (byte) (val >> 32), + (byte) (val >> 40), + (byte) (val >> 48), + (byte) (val >> 56)}; + + currentHash = FNV64Hash.continueHash(currentHash, b, FNV64Hash.Version.v1); + return currentHash; + } } private static class PathwayHashBuilder { From 918da022c73007af157306c8f3896ab1b90f17b2 Mon Sep 17 00:00:00 2001 From: Igor Kravchenko Date: Wed, 9 Jul 2025 12:17:16 -0500 Subject: [PATCH 2/3] Spotless apply --- .../datastreams/DefaultPathwayContext.java | 20 ++++++++++--------- 1 file changed, 11 insertions(+), 9 deletions(-) diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java index dd0ed1c3000..0988821aada 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java @@ -336,15 +336,17 @@ public long addValue(String val) { } public long addValue(long val) { - byte[] b = new byte[] { - (byte) val, - (byte) (val >> 8), - (byte) (val >> 16), - (byte) (val >> 24), - (byte) (val >> 32), - (byte) (val >> 40), - (byte) (val >> 48), - (byte) (val >> 56)}; + byte[] b = + new byte[] { + (byte) val, + (byte) (val >> 8), + (byte) (val >> 16), + (byte) (val >> 24), + (byte) (val >> 32), + (byte) (val >> 40), + (byte) (val >> 48), + (byte) (val >> 56) + }; currentHash = FNV64Hash.continueHash(currentHash, b, FNV64Hash.Version.v1); return currentHash; From 70ca45a2bd87c3e87255aeba0997c311c9cbfef8 Mon Sep 17 00:00:00 2001 From: Igor Kravchenko Date: Wed, 9 Jul 2025 13:13:03 -0500 Subject: [PATCH 3/3] Use synchronized --- .../datastreams/DefaultPathwayContext.java | 219 ++++++++---------- 1 file changed, 101 insertions(+), 118 deletions(-) diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java index 0988821aada..3d5a5266bf6 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java @@ -26,8 +26,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiConsumer; import java.util.function.Consumer; import org.slf4j.Logger; @@ -35,7 +33,6 @@ public class DefaultPathwayContext implements PathwayContext { private static final Logger log = LoggerFactory.getLogger(DefaultPathwayContext.class); - private final Lock lock = new ReentrantLock(); private final long hashOfKnownTags; private final TimeSource timeSource; private final String serviceNameOverride; @@ -107,91 +104,87 @@ public long getHash() { } @Override - public void setCheckpoint(DataStreamsContext context, Consumer pointConsumer) { + public synchronized void setCheckpoint( + DataStreamsContext context, Consumer pointConsumer) { long startNanos = timeSource.getCurrentTimeNanos(); long nanoTicks = timeSource.getNanoTicks(); - lock.lock(); - try { - // So far, each tag key has only one tag value, so we're initializing the capacity to match - // the number of tag keys for now. We should revisit this later if it's no longer the case. - LinkedHashMap sortedTags = context.sortedTags(); - List allTags = new ArrayList<>(sortedTags.size()); - PathwayHashBuilder pathwayHashBuilder = - new PathwayHashBuilder(hashOfKnownTags, serviceNameOverride); - DataSetHashBuilder aggregationHashBuilder = new DataSetHashBuilder(); - - if (!started) { - long defaultTimestamp = context.defaultTimestamp(); - if (defaultTimestamp == 0) { - pathwayStartNanos = startNanos; - pathwayStartNanoTicks = nanoTicks; - edgeStartNanoTicks = nanoTicks; - } else { - pathwayStartNanos = MILLISECONDS.toNanos(defaultTimestamp); - pathwayStartNanoTicks = - nanoTicks - - MILLISECONDS.toNanos(timeSource.getCurrentTimeMillis() - defaultTimestamp); - edgeStartNanoTicks = pathwayStartNanoTicks; - } - hash = 0; - started = true; - log.debug("Started {}", this); + // So far, each tag key has only one tag value, so we're initializing the capacity to match + // the number of tag keys for now. We should revisit this later if it's no longer the case. + LinkedHashMap sortedTags = context.sortedTags(); + List allTags = new ArrayList<>(sortedTags.size()); + PathwayHashBuilder pathwayHashBuilder = + new PathwayHashBuilder(hashOfKnownTags, serviceNameOverride); + DataSetHashBuilder aggregationHashBuilder = new DataSetHashBuilder(); + + if (!started) { + long defaultTimestamp = context.defaultTimestamp(); + if (defaultTimestamp == 0) { + pathwayStartNanos = startNanos; + pathwayStartNanoTicks = nanoTicks; + edgeStartNanoTicks = nanoTicks; + } else { + pathwayStartNanos = MILLISECONDS.toNanos(defaultTimestamp); + pathwayStartNanoTicks = + nanoTicks - MILLISECONDS.toNanos(timeSource.getCurrentTimeMillis() - defaultTimestamp); + edgeStartNanoTicks = pathwayStartNanoTicks; } - for (Map.Entry entry : sortedTags.entrySet()) { - String tag = TagsProcessor.createTag(entry.getKey(), entry.getValue()); - if (tag == null) { - continue; - } - if (hashableTagKeys.contains(entry.getKey())) { - pathwayHashBuilder.addTag(tag); - } - if (extraAggregationTagKeys.contains(entry.getKey())) { - aggregationHashBuilder.addValue(tag); - } - allTags.add(tag); - } + hash = 0; + started = true; + log.debug("Started {}", this); + } - long nodeHash = generateNodeHash(pathwayHashBuilder); - // loop protection - a node should not be chosen as parent - // for a sequential node with the same direction, as this - // will cause a `cardinality explosion` for hash / parentHash tag values - if (sortedTags.containsKey(TagsProcessor.DIRECTION_TAG)) { - String direction = sortedTags.get(TagsProcessor.DIRECTION_TAG); - if (direction.equals(previousDirection)) { - hash = closestOppositeDirectionHash; - } else { - previousDirection = direction; - closestOppositeDirectionHash = hash; - } + for (Map.Entry entry : sortedTags.entrySet()) { + String tag = TagsProcessor.createTag(entry.getKey(), entry.getValue()); + if (tag == null) { + continue; + } + if (hashableTagKeys.contains(entry.getKey())) { + pathwayHashBuilder.addTag(tag); } + if (extraAggregationTagKeys.contains(entry.getKey())) { + aggregationHashBuilder.addValue(tag); + } + allTags.add(tag); + } - long newHash = generatePathwayHash(nodeHash, hash); - long aggregationHash = aggregationHashBuilder.addValue(newHash); - - long pathwayLatencyNano = nanoTicks - pathwayStartNanoTicks; - long edgeLatencyNano = nanoTicks - edgeStartNanoTicks; - - StatsPoint point = - new StatsPoint( - allTags, - newHash, - hash, - aggregationHash, - startNanos, - pathwayLatencyNano, - edgeLatencyNano, - context.payloadSizeBytes(), - serviceNameOverride); - edgeStartNanoTicks = nanoTicks; - hash = newHash; - - pointConsumer.accept(point); - log.debug("Checkpoint set {}, hash source: {}", this, pathwayHashBuilder); - } finally { - lock.unlock(); + long nodeHash = generateNodeHash(pathwayHashBuilder); + // loop protection - a node should not be chosen as parent + // for a sequential node with the same direction, as this + // will cause a `cardinality explosion` for hash / parentHash tag values + if (sortedTags.containsKey(TagsProcessor.DIRECTION_TAG)) { + String direction = sortedTags.get(TagsProcessor.DIRECTION_TAG); + if (direction.equals(previousDirection)) { + hash = closestOppositeDirectionHash; + } else { + previousDirection = direction; + closestOppositeDirectionHash = hash; + } } + + long newHash = generatePathwayHash(nodeHash, hash); + long aggregationHash = aggregationHashBuilder.addValue(newHash); + + long pathwayLatencyNano = nanoTicks - pathwayStartNanoTicks; + long edgeLatencyNano = nanoTicks - edgeStartNanoTicks; + + StatsPoint point = + new StatsPoint( + allTags, + newHash, + hash, + aggregationHash, + startNanos, + pathwayLatencyNano, + edgeLatencyNano, + context.payloadSizeBytes(), + serviceNameOverride); + edgeStartNanoTicks = nanoTicks; + hash = newHash; + + pointConsumer.accept(point); + log.debug("Checkpoint set {}, hash source: {}", this, pathwayHashBuilder); } @Override @@ -205,52 +198,42 @@ public StatsPoint getSavedStats() { } @Override - public String encode() throws IOException { - lock.lock(); - try { - if (!started) { - throw new IllegalStateException("Context must be started to encode"); - } + public synchronized String encode() throws IOException { + if (!started) { + throw new IllegalStateException("Context must be started to encode"); + } - outputBuffer.clear(); - outputBuffer.writeLongLE(hash); + outputBuffer.clear(); + outputBuffer.writeLongLE(hash); - long pathwayStartMillis = TimeUnit.NANOSECONDS.toMillis(pathwayStartNanos); - VarEncodingHelper.encodeSignedVarLong(outputBuffer, pathwayStartMillis); + long pathwayStartMillis = TimeUnit.NANOSECONDS.toMillis(pathwayStartNanos); + VarEncodingHelper.encodeSignedVarLong(outputBuffer, pathwayStartMillis); - long edgeStartMillis = - pathwayStartMillis - + TimeUnit.NANOSECONDS.toMillis(edgeStartNanoTicks - pathwayStartNanoTicks); + long edgeStartMillis = + pathwayStartMillis + + TimeUnit.NANOSECONDS.toMillis(edgeStartNanoTicks - pathwayStartNanoTicks); - VarEncodingHelper.encodeSignedVarLong(outputBuffer, edgeStartMillis); - byte[] base64 = Base64.getEncoder().encode(outputBuffer.trimmedCopy()); - return new String(base64, ISO_8859_1); - } finally { - lock.unlock(); - } + VarEncodingHelper.encodeSignedVarLong(outputBuffer, edgeStartMillis); + byte[] base64 = Base64.getEncoder().encode(outputBuffer.trimmedCopy()); + return new String(base64, ISO_8859_1); } @Override - public String toString() { - lock.lock(); - try { - if (started) { - return "PathwayContext[ Hash " - + Long.toUnsignedString(hash) - + ", Start: " - + pathwayStartNanos - + ", StartTicks: " - + pathwayStartNanoTicks - + ", Edge Start Ticks: " - + edgeStartNanoTicks - + ", objectHashcode:" - + hashCode() - + "]"; - } else { - return "PathwayContext [Not Started]"; - } - } finally { - lock.unlock(); + public synchronized String toString() { + if (started) { + return "PathwayContext[ Hash " + + Long.toUnsignedString(hash) + + ", Start: " + + pathwayStartNanos + + ", StartTicks: " + + pathwayStartNanoTicks + + ", Edge Start Ticks: " + + edgeStartNanoTicks + + ", objectHashcode:" + + hashCode() + + "]"; + } else { + return "PathwayContext [Not Started]"; } }