2626import java .util .Map ;
2727import java .util .Set ;
2828import java .util .concurrent .TimeUnit ;
29- import java .util .concurrent .locks .Lock ;
30- import java .util .concurrent .locks .ReentrantLock ;
3129import java .util .function .BiConsumer ;
3230import java .util .function .Consumer ;
3331import org .slf4j .Logger ;
3432import org .slf4j .LoggerFactory ;
3533
3634public class DefaultPathwayContext implements PathwayContext {
3735 private static final Logger log = LoggerFactory .getLogger (DefaultPathwayContext .class );
38- private final Lock lock = new ReentrantLock ();
3936 private final long hashOfKnownTags ;
4037 private final TimeSource timeSource ;
4138 private final String serviceNameOverride ;
@@ -107,91 +104,87 @@ public long getHash() {
107104 }
108105
109106 @ Override
110- public void setCheckpoint (DataStreamsContext context , Consumer <StatsPoint > pointConsumer ) {
107+ public synchronized void setCheckpoint (
108+ DataStreamsContext context , Consumer <StatsPoint > pointConsumer ) {
111109 long startNanos = timeSource .getCurrentTimeNanos ();
112110 long nanoTicks = timeSource .getNanoTicks ();
113- lock .lock ();
114- try {
115- // So far, each tag key has only one tag value, so we're initializing the capacity to match
116- // the number of tag keys for now. We should revisit this later if it's no longer the case.
117- LinkedHashMap <String , String > sortedTags = context .sortedTags ();
118- List <String > allTags = new ArrayList <>(sortedTags .size ());
119- PathwayHashBuilder pathwayHashBuilder =
120- new PathwayHashBuilder (hashOfKnownTags , serviceNameOverride );
121- DataSetHashBuilder aggregationHashBuilder = new DataSetHashBuilder ();
122-
123- if (!started ) {
124- long defaultTimestamp = context .defaultTimestamp ();
125- if (defaultTimestamp == 0 ) {
126- pathwayStartNanos = startNanos ;
127- pathwayStartNanoTicks = nanoTicks ;
128- edgeStartNanoTicks = nanoTicks ;
129- } else {
130- pathwayStartNanos = MILLISECONDS .toNanos (defaultTimestamp );
131- pathwayStartNanoTicks =
132- nanoTicks
133- - MILLISECONDS .toNanos (timeSource .getCurrentTimeMillis () - defaultTimestamp );
134- edgeStartNanoTicks = pathwayStartNanoTicks ;
135- }
136111
137- hash = 0 ;
138- started = true ;
139- log .debug ("Started {}" , this );
112+ // So far, each tag key has only one tag value, so we're initializing the capacity to match
113+ // the number of tag keys for now. We should revisit this later if it's no longer the case.
114+ LinkedHashMap <String , String > sortedTags = context .sortedTags ();
115+ List <String > allTags = new ArrayList <>(sortedTags .size ());
116+ PathwayHashBuilder pathwayHashBuilder =
117+ new PathwayHashBuilder (hashOfKnownTags , serviceNameOverride );
118+ DataSetHashBuilder aggregationHashBuilder = new DataSetHashBuilder ();
119+
120+ if (!started ) {
121+ long defaultTimestamp = context .defaultTimestamp ();
122+ if (defaultTimestamp == 0 ) {
123+ pathwayStartNanos = startNanos ;
124+ pathwayStartNanoTicks = nanoTicks ;
125+ edgeStartNanoTicks = nanoTicks ;
126+ } else {
127+ pathwayStartNanos = MILLISECONDS .toNanos (defaultTimestamp );
128+ pathwayStartNanoTicks =
129+ nanoTicks - MILLISECONDS .toNanos (timeSource .getCurrentTimeMillis () - defaultTimestamp );
130+ edgeStartNanoTicks = pathwayStartNanoTicks ;
140131 }
141132
142- for (Map .Entry <String , String > entry : sortedTags .entrySet ()) {
143- String tag = TagsProcessor .createTag (entry .getKey (), entry .getValue ());
144- if (tag == null ) {
145- continue ;
146- }
147- if (hashableTagKeys .contains (entry .getKey ())) {
148- pathwayHashBuilder .addTag (tag );
149- }
150- if (extraAggregationTagKeys .contains (entry .getKey ())) {
151- aggregationHashBuilder .addValue (tag );
152- }
153- allTags .add (tag );
154- }
133+ hash = 0 ;
134+ started = true ;
135+ log .debug ("Started {}" , this );
136+ }
155137
156- long nodeHash = generateNodeHash (pathwayHashBuilder );
157- // loop protection - a node should not be chosen as parent
158- // for a sequential node with the same direction, as this
159- // will cause a `cardinality explosion` for hash / parentHash tag values
160- if (sortedTags .containsKey (TagsProcessor .DIRECTION_TAG )) {
161- String direction = sortedTags .get (TagsProcessor .DIRECTION_TAG );
162- if (direction .equals (previousDirection )) {
163- hash = closestOppositeDirectionHash ;
164- } else {
165- previousDirection = direction ;
166- closestOppositeDirectionHash = hash ;
167- }
138+ for (Map .Entry <String , String > entry : sortedTags .entrySet ()) {
139+ String tag = TagsProcessor .createTag (entry .getKey (), entry .getValue ());
140+ if (tag == null ) {
141+ continue ;
142+ }
143+ if (hashableTagKeys .contains (entry .getKey ())) {
144+ pathwayHashBuilder .addTag (tag );
168145 }
146+ if (extraAggregationTagKeys .contains (entry .getKey ())) {
147+ aggregationHashBuilder .addValue (tag );
148+ }
149+ allTags .add (tag );
150+ }
169151
170- long newHash = generatePathwayHash (nodeHash , hash );
171- long aggregationHash = aggregationHashBuilder .addValue (String .valueOf (newHash ));
172-
173- long pathwayLatencyNano = nanoTicks - pathwayStartNanoTicks ;
174- long edgeLatencyNano = nanoTicks - edgeStartNanoTicks ;
175-
176- StatsPoint point =
177- new StatsPoint (
178- allTags ,
179- newHash ,
180- hash ,
181- aggregationHash ,
182- startNanos ,
183- pathwayLatencyNano ,
184- edgeLatencyNano ,
185- context .payloadSizeBytes (),
186- serviceNameOverride );
187- edgeStartNanoTicks = nanoTicks ;
188- hash = newHash ;
189-
190- pointConsumer .accept (point );
191- log .debug ("Checkpoint set {}, hash source: {}" , this , pathwayHashBuilder );
192- } finally {
193- lock .unlock ();
152+ long nodeHash = generateNodeHash (pathwayHashBuilder );
153+ // loop protection - a node should not be chosen as parent
154+ // for a sequential node with the same direction, as this
155+ // will cause a `cardinality explosion` for hash / parentHash tag values
156+ if (sortedTags .containsKey (TagsProcessor .DIRECTION_TAG )) {
157+ String direction = sortedTags .get (TagsProcessor .DIRECTION_TAG );
158+ if (direction .equals (previousDirection )) {
159+ hash = closestOppositeDirectionHash ;
160+ } else {
161+ previousDirection = direction ;
162+ closestOppositeDirectionHash = hash ;
163+ }
194164 }
165+
166+ long newHash = generatePathwayHash (nodeHash , hash );
167+ long aggregationHash = aggregationHashBuilder .addValue (newHash );
168+
169+ long pathwayLatencyNano = nanoTicks - pathwayStartNanoTicks ;
170+ long edgeLatencyNano = nanoTicks - edgeStartNanoTicks ;
171+
172+ StatsPoint point =
173+ new StatsPoint (
174+ allTags ,
175+ newHash ,
176+ hash ,
177+ aggregationHash ,
178+ startNanos ,
179+ pathwayLatencyNano ,
180+ edgeLatencyNano ,
181+ context .payloadSizeBytes (),
182+ serviceNameOverride );
183+ edgeStartNanoTicks = nanoTicks ;
184+ hash = newHash ;
185+
186+ pointConsumer .accept (point );
187+ log .debug ("Checkpoint set {}, hash source: {}" , this , pathwayHashBuilder );
195188 }
196189
197190 @ Override
@@ -205,52 +198,42 @@ public StatsPoint getSavedStats() {
205198 }
206199
207200 @ Override
208- public String encode () throws IOException {
209- lock .lock ();
210- try {
211- if (!started ) {
212- throw new IllegalStateException ("Context must be started to encode" );
213- }
201+ public synchronized String encode () throws IOException {
202+ if (!started ) {
203+ throw new IllegalStateException ("Context must be started to encode" );
204+ }
214205
215- outputBuffer .clear ();
216- outputBuffer .writeLongLE (hash );
206+ outputBuffer .clear ();
207+ outputBuffer .writeLongLE (hash );
217208
218- long pathwayStartMillis = TimeUnit .NANOSECONDS .toMillis (pathwayStartNanos );
219- VarEncodingHelper .encodeSignedVarLong (outputBuffer , pathwayStartMillis );
209+ long pathwayStartMillis = TimeUnit .NANOSECONDS .toMillis (pathwayStartNanos );
210+ VarEncodingHelper .encodeSignedVarLong (outputBuffer , pathwayStartMillis );
220211
221- long edgeStartMillis =
222- pathwayStartMillis
223- + TimeUnit .NANOSECONDS .toMillis (edgeStartNanoTicks - pathwayStartNanoTicks );
212+ long edgeStartMillis =
213+ pathwayStartMillis
214+ + TimeUnit .NANOSECONDS .toMillis (edgeStartNanoTicks - pathwayStartNanoTicks );
224215
225- VarEncodingHelper .encodeSignedVarLong (outputBuffer , edgeStartMillis );
226- byte [] base64 = Base64 .getEncoder ().encode (outputBuffer .trimmedCopy ());
227- return new String (base64 , ISO_8859_1 );
228- } finally {
229- lock .unlock ();
230- }
216+ VarEncodingHelper .encodeSignedVarLong (outputBuffer , edgeStartMillis );
217+ byte [] base64 = Base64 .getEncoder ().encode (outputBuffer .trimmedCopy ());
218+ return new String (base64 , ISO_8859_1 );
231219 }
232220
233221 @ Override
234- public String toString () {
235- lock .lock ();
236- try {
237- if (started ) {
238- return "PathwayContext[ Hash "
239- + Long .toUnsignedString (hash )
240- + ", Start: "
241- + pathwayStartNanos
242- + ", StartTicks: "
243- + pathwayStartNanoTicks
244- + ", Edge Start Ticks: "
245- + edgeStartNanoTicks
246- + ", objectHashcode:"
247- + hashCode ()
248- + "]" ;
249- } else {
250- return "PathwayContext [Not Started]" ;
251- }
252- } finally {
253- lock .unlock ();
222+ public synchronized String toString () {
223+ if (started ) {
224+ return "PathwayContext[ Hash "
225+ + Long .toUnsignedString (hash )
226+ + ", Start: "
227+ + pathwayStartNanos
228+ + ", StartTicks: "
229+ + pathwayStartNanoTicks
230+ + ", Edge Start Ticks: "
231+ + edgeStartNanoTicks
232+ + ", objectHashcode:"
233+ + hashCode ()
234+ + "]" ;
235+ } else {
236+ return "PathwayContext [Not Started]" ;
254237 }
255238 }
256239
@@ -334,6 +317,23 @@ public long addValue(String val) {
334317 currentHash = FNV64Hash .generateHash (currentHash + val , FNV64Hash .Version .v1 );
335318 return currentHash ;
336319 }
320+
321+ public long addValue (long val ) {
322+ byte [] b =
323+ new byte [] {
324+ (byte ) val ,
325+ (byte ) (val >> 8 ),
326+ (byte ) (val >> 16 ),
327+ (byte ) (val >> 24 ),
328+ (byte ) (val >> 32 ),
329+ (byte ) (val >> 40 ),
330+ (byte ) (val >> 48 ),
331+ (byte ) (val >> 56 )
332+ };
333+
334+ currentHash = FNV64Hash .continueHash (currentHash , b , FNV64Hash .Version .v1 );
335+ return currentHash ;
336+ }
337337 }
338338
339339 private static class PathwayHashBuilder {
0 commit comments