diff --git a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java index a1858eab9cd..3d5a5266bf6 100644 --- a/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java +++ b/dd-trace-core/src/main/java/datadog/trace/core/datastreams/DefaultPathwayContext.java @@ -26,8 +26,6 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; import java.util.function.BiConsumer; import java.util.function.Consumer; import org.slf4j.Logger; @@ -35,7 +33,6 @@ public class DefaultPathwayContext implements PathwayContext { private static final Logger log = LoggerFactory.getLogger(DefaultPathwayContext.class); - private final Lock lock = new ReentrantLock(); private final long hashOfKnownTags; private final TimeSource timeSource; private final String serviceNameOverride; @@ -107,91 +104,87 @@ public long getHash() { } @Override - public void setCheckpoint(DataStreamsContext context, Consumer pointConsumer) { + public synchronized void setCheckpoint( + DataStreamsContext context, Consumer pointConsumer) { long startNanos = timeSource.getCurrentTimeNanos(); long nanoTicks = timeSource.getNanoTicks(); - lock.lock(); - try { - // So far, each tag key has only one tag value, so we're initializing the capacity to match - // the number of tag keys for now. We should revisit this later if it's no longer the case. - LinkedHashMap sortedTags = context.sortedTags(); - List allTags = new ArrayList<>(sortedTags.size()); - PathwayHashBuilder pathwayHashBuilder = - new PathwayHashBuilder(hashOfKnownTags, serviceNameOverride); - DataSetHashBuilder aggregationHashBuilder = new DataSetHashBuilder(); - - if (!started) { - long defaultTimestamp = context.defaultTimestamp(); - if (defaultTimestamp == 0) { - pathwayStartNanos = startNanos; - pathwayStartNanoTicks = nanoTicks; - edgeStartNanoTicks = nanoTicks; - } else { - pathwayStartNanos = MILLISECONDS.toNanos(defaultTimestamp); - pathwayStartNanoTicks = - nanoTicks - - MILLISECONDS.toNanos(timeSource.getCurrentTimeMillis() - defaultTimestamp); - edgeStartNanoTicks = pathwayStartNanoTicks; - } - hash = 0; - started = true; - log.debug("Started {}", this); + // So far, each tag key has only one tag value, so we're initializing the capacity to match + // the number of tag keys for now. We should revisit this later if it's no longer the case. + LinkedHashMap sortedTags = context.sortedTags(); + List allTags = new ArrayList<>(sortedTags.size()); + PathwayHashBuilder pathwayHashBuilder = + new PathwayHashBuilder(hashOfKnownTags, serviceNameOverride); + DataSetHashBuilder aggregationHashBuilder = new DataSetHashBuilder(); + + if (!started) { + long defaultTimestamp = context.defaultTimestamp(); + if (defaultTimestamp == 0) { + pathwayStartNanos = startNanos; + pathwayStartNanoTicks = nanoTicks; + edgeStartNanoTicks = nanoTicks; + } else { + pathwayStartNanos = MILLISECONDS.toNanos(defaultTimestamp); + pathwayStartNanoTicks = + nanoTicks - MILLISECONDS.toNanos(timeSource.getCurrentTimeMillis() - defaultTimestamp); + edgeStartNanoTicks = pathwayStartNanoTicks; } - for (Map.Entry entry : sortedTags.entrySet()) { - String tag = TagsProcessor.createTag(entry.getKey(), entry.getValue()); - if (tag == null) { - continue; - } - if (hashableTagKeys.contains(entry.getKey())) { - pathwayHashBuilder.addTag(tag); - } - if (extraAggregationTagKeys.contains(entry.getKey())) { - aggregationHashBuilder.addValue(tag); - } - allTags.add(tag); - } + hash = 0; + started = true; + log.debug("Started {}", this); + } - long nodeHash = generateNodeHash(pathwayHashBuilder); - // loop protection - a node should not be chosen as parent - // for a sequential node with the same direction, as this - // will cause a `cardinality explosion` for hash / parentHash tag values - if (sortedTags.containsKey(TagsProcessor.DIRECTION_TAG)) { - String direction = sortedTags.get(TagsProcessor.DIRECTION_TAG); - if (direction.equals(previousDirection)) { - hash = closestOppositeDirectionHash; - } else { - previousDirection = direction; - closestOppositeDirectionHash = hash; - } + for (Map.Entry entry : sortedTags.entrySet()) { + String tag = TagsProcessor.createTag(entry.getKey(), entry.getValue()); + if (tag == null) { + continue; + } + if (hashableTagKeys.contains(entry.getKey())) { + pathwayHashBuilder.addTag(tag); } + if (extraAggregationTagKeys.contains(entry.getKey())) { + aggregationHashBuilder.addValue(tag); + } + allTags.add(tag); + } - long newHash = generatePathwayHash(nodeHash, hash); - long aggregationHash = aggregationHashBuilder.addValue(String.valueOf(newHash)); - - long pathwayLatencyNano = nanoTicks - pathwayStartNanoTicks; - long edgeLatencyNano = nanoTicks - edgeStartNanoTicks; - - StatsPoint point = - new StatsPoint( - allTags, - newHash, - hash, - aggregationHash, - startNanos, - pathwayLatencyNano, - edgeLatencyNano, - context.payloadSizeBytes(), - serviceNameOverride); - edgeStartNanoTicks = nanoTicks; - hash = newHash; - - pointConsumer.accept(point); - log.debug("Checkpoint set {}, hash source: {}", this, pathwayHashBuilder); - } finally { - lock.unlock(); + long nodeHash = generateNodeHash(pathwayHashBuilder); + // loop protection - a node should not be chosen as parent + // for a sequential node with the same direction, as this + // will cause a `cardinality explosion` for hash / parentHash tag values + if (sortedTags.containsKey(TagsProcessor.DIRECTION_TAG)) { + String direction = sortedTags.get(TagsProcessor.DIRECTION_TAG); + if (direction.equals(previousDirection)) { + hash = closestOppositeDirectionHash; + } else { + previousDirection = direction; + closestOppositeDirectionHash = hash; + } } + + long newHash = generatePathwayHash(nodeHash, hash); + long aggregationHash = aggregationHashBuilder.addValue(newHash); + + long pathwayLatencyNano = nanoTicks - pathwayStartNanoTicks; + long edgeLatencyNano = nanoTicks - edgeStartNanoTicks; + + StatsPoint point = + new StatsPoint( + allTags, + newHash, + hash, + aggregationHash, + startNanos, + pathwayLatencyNano, + edgeLatencyNano, + context.payloadSizeBytes(), + serviceNameOverride); + edgeStartNanoTicks = nanoTicks; + hash = newHash; + + pointConsumer.accept(point); + log.debug("Checkpoint set {}, hash source: {}", this, pathwayHashBuilder); } @Override @@ -205,52 +198,42 @@ public StatsPoint getSavedStats() { } @Override - public String encode() throws IOException { - lock.lock(); - try { - if (!started) { - throw new IllegalStateException("Context must be started to encode"); - } + public synchronized String encode() throws IOException { + if (!started) { + throw new IllegalStateException("Context must be started to encode"); + } - outputBuffer.clear(); - outputBuffer.writeLongLE(hash); + outputBuffer.clear(); + outputBuffer.writeLongLE(hash); - long pathwayStartMillis = TimeUnit.NANOSECONDS.toMillis(pathwayStartNanos); - VarEncodingHelper.encodeSignedVarLong(outputBuffer, pathwayStartMillis); + long pathwayStartMillis = TimeUnit.NANOSECONDS.toMillis(pathwayStartNanos); + VarEncodingHelper.encodeSignedVarLong(outputBuffer, pathwayStartMillis); - long edgeStartMillis = - pathwayStartMillis - + TimeUnit.NANOSECONDS.toMillis(edgeStartNanoTicks - pathwayStartNanoTicks); + long edgeStartMillis = + pathwayStartMillis + + TimeUnit.NANOSECONDS.toMillis(edgeStartNanoTicks - pathwayStartNanoTicks); - VarEncodingHelper.encodeSignedVarLong(outputBuffer, edgeStartMillis); - byte[] base64 = Base64.getEncoder().encode(outputBuffer.trimmedCopy()); - return new String(base64, ISO_8859_1); - } finally { - lock.unlock(); - } + VarEncodingHelper.encodeSignedVarLong(outputBuffer, edgeStartMillis); + byte[] base64 = Base64.getEncoder().encode(outputBuffer.trimmedCopy()); + return new String(base64, ISO_8859_1); } @Override - public String toString() { - lock.lock(); - try { - if (started) { - return "PathwayContext[ Hash " - + Long.toUnsignedString(hash) - + ", Start: " - + pathwayStartNanos - + ", StartTicks: " - + pathwayStartNanoTicks - + ", Edge Start Ticks: " - + edgeStartNanoTicks - + ", objectHashcode:" - + hashCode() - + "]"; - } else { - return "PathwayContext [Not Started]"; - } - } finally { - lock.unlock(); + public synchronized String toString() { + if (started) { + return "PathwayContext[ Hash " + + Long.toUnsignedString(hash) + + ", Start: " + + pathwayStartNanos + + ", StartTicks: " + + pathwayStartNanoTicks + + ", Edge Start Ticks: " + + edgeStartNanoTicks + + ", objectHashcode:" + + hashCode() + + "]"; + } else { + return "PathwayContext [Not Started]"; } } @@ -334,6 +317,23 @@ public long addValue(String val) { currentHash = FNV64Hash.generateHash(currentHash + val, FNV64Hash.Version.v1); return currentHash; } + + public long addValue(long val) { + byte[] b = + new byte[] { + (byte) val, + (byte) (val >> 8), + (byte) (val >> 16), + (byte) (val >> 24), + (byte) (val >> 32), + (byte) (val >> 40), + (byte) (val >> 48), + (byte) (val >> 56) + }; + + currentHash = FNV64Hash.continueHash(currentHash, b, FNV64Hash.Version.v1); + return currentHash; + } } private static class PathwayHashBuilder {