From f5908e36a2134ee0a73814642b85e354df2eb45d Mon Sep 17 00:00:00 2001 From: Zhengyu Gu Date: Wed, 18 Jun 2025 10:34:27 -0400 Subject: [PATCH] Improve performance using Unsafe to activate/deactivate thread filter --- ddprof-lib/src/main/cpp/javaApi.cpp | 21 ++++++ ddprof-lib/src/main/cpp/threadFilter.cpp | 37 +++++++--- ddprof-lib/src/main/cpp/threadFilter.h | 12 +++- .../com/datadoghq/profiler/ActiveBitmap.java | 71 +++++++++++++++++++ .../com/datadoghq/profiler/JavaProfiler.java | 37 ++++++---- 5 files changed, 152 insertions(+), 26 deletions(-) create mode 100644 ddprof-lib/src/main/java/com/datadoghq/profiler/ActiveBitmap.java diff --git a/ddprof-lib/src/main/cpp/javaApi.cpp b/ddprof-lib/src/main/cpp/javaApi.cpp index 7d45e4227..017f23c45 100644 --- a/ddprof-lib/src/main/cpp/javaApi.cpp +++ b/ddprof-lib/src/main/cpp/javaApi.cpp @@ -406,3 +406,24 @@ Java_com_datadoghq_profiler_JVMAccess_healthCheck0(JNIEnv *env, jobject unused) { return true; } + +extern "C" DLLEXPORT jlong JNICALL +Java_com_datadoghq_profiler_ActiveBitmap_bitmapAddressFor0(JNIEnv *env, + jclass unused, + jint tid) { + u64* bitmap = Profiler::instance()->threadFilter()->bitmapAddressFor((int)tid); + return (jlong)bitmap; +} + +extern "C" DLLEXPORT jboolean JNICALL +Java_com_datadoghq_profiler_ActiveBitmap_isActive0(JNIEnv *env, + jclass unused, + jint tid) { + return Profiler::instance()->threadFilter()->accept((int)tid) ? JNI_TRUE : JNI_FALSE; +} + +extern "C" DLLEXPORT jlong JNICALL +Java_com_datadoghq_profiler_ActiveBitmap_getActiveCountAddr0(JNIEnv *env, + jclass unused) { + return (jlong)Profiler::instance()->threadFilter()->addressOfSize(); +} diff --git a/ddprof-lib/src/main/cpp/threadFilter.cpp b/ddprof-lib/src/main/cpp/threadFilter.cpp index 1e73a955c..31713ae5d 100644 --- a/ddprof-lib/src/main/cpp/threadFilter.cpp +++ b/ddprof-lib/src/main/cpp/threadFilter.cpp @@ -18,6 +18,7 @@ #include "counters.h" #include "os.h" #include "reverse_bits.h" +#include #include #include @@ -96,19 +97,15 @@ int ThreadFilter::mapThreadId(int thread_id) { return tid; } -bool ThreadFilter::accept(int thread_id) { - thread_id = mapThreadId(thread_id); - u64 *b = bitmap(thread_id); - return b != NULL && (word(b, thread_id) & (1ULL << (thread_id & 0x3f))); -} - -void ThreadFilter::add(int thread_id) { - thread_id = mapThreadId(thread_id); - u64 *b = bitmap(thread_id); +// Get bitmap that contains the thread id, create one if it does not exist +u64* ThreadFilter::getBitmapFor(int thread_id) { + int index = thread_id / BITMAP_CAPACITY; + assert(index >= 0 && index < (int)_max_bitmaps); + u64* b = _bitmap[index]; if (b == NULL) { b = (u64 *)OS::safeAlloc(BITMAP_SIZE); u64 *oldb = __sync_val_compare_and_swap( - &_bitmap[(u32)thread_id / BITMAP_CAPACITY], NULL, b); + &_bitmap[index], NULL, b); if (oldb != NULL) { OS::safeFree(b, BITMAP_SIZE); b = oldb; @@ -116,7 +113,27 @@ void ThreadFilter::add(int thread_id) { trackPage(); } } + return b; +} +u64* ThreadFilter::bitmapAddressFor(int thread_id) { + u64* b = getBitmapFor(thread_id); + thread_id = mapThreadId(thread_id); + assert(b == bitmap(thread_id)); + return wordAddress(b, thread_id); +} + +bool ThreadFilter::accept(int thread_id) { + thread_id = mapThreadId(thread_id); + u64 *b = bitmap(thread_id); + return b != NULL && (word(b, thread_id) & (1ULL << (thread_id & 0x3f))); +} + +void ThreadFilter::add(int thread_id) { + u64 *b = getBitmapFor(thread_id); + assert (b != NULL); + thread_id = mapThreadId(thread_id); + assert(b == bitmap(thread_id)); u64 bit = 1ULL << (thread_id & 0x3f); if (!(__sync_fetch_and_or(&word(b, thread_id), bit) & bit)) { atomicInc(_size); diff --git a/ddprof-lib/src/main/cpp/threadFilter.h b/ddprof-lib/src/main/cpp/threadFilter.h index 0247f51e6..8db82f9fb 100644 --- a/ddprof-lib/src/main/cpp/threadFilter.h +++ b/ddprof-lib/src/main/cpp/threadFilter.h @@ -51,14 +51,21 @@ class ThreadFilter { // todo: add thread safe APIs return bitmap[((u32)thread_id % BITMAP_CAPACITY) >> 6]; } + + u64* const wordAddress(u64 *bitmap, int thread_id) const { + return &bitmap[((u32)thread_id % BITMAP_CAPACITY) >> 6]; + } + + u64* getBitmapFor(int thread_id); public: ThreadFilter(); ThreadFilter(ThreadFilter &threadFilter) = delete; ~ThreadFilter(); - bool enabled() { return _enabled; } + bool enabled() const { return _enabled; } - int size() { return _size; } + int size() const { return _size; } + const volatile int* addressOfSize() const { return &_size; } void init(const char *filter); void clear(); @@ -66,6 +73,7 @@ class ThreadFilter { bool accept(int thread_id); void add(int thread_id); void remove(int thread_id); + u64* bitmapAddressFor(int thread_id); void collect(std::vector &v); }; diff --git a/ddprof-lib/src/main/java/com/datadoghq/profiler/ActiveBitmap.java b/ddprof-lib/src/main/java/com/datadoghq/profiler/ActiveBitmap.java new file mode 100644 index 000000000..7e1601595 --- /dev/null +++ b/ddprof-lib/src/main/java/com/datadoghq/profiler/ActiveBitmap.java @@ -0,0 +1,71 @@ +package com.datadoghq.profiler; + +import sun.misc.Unsafe; +import java.lang.reflect.Field; + + +class ActiveBitmap { + private static final Unsafe UNSAFE = JavaProfiler.UNSAFE; + + // Address to size field of ThreadFilter in native + private static long activeCountAddr; + + private static final ThreadLocal Address = new ThreadLocal() { + @Override protected Long initialValue() { + return -1L; + } + }; + + public static void initialize() { + activeCountAddr = getActiveCountAddr0(); + } + + // On native side, we reverse lower 16 bits of thread id when maps to bitmap bit. + // So the active bit position of the specific thread id maps to the reverse order + // of the second lowest byte of thread id. + static long getBitmask(int tid) { + int tmp = (tid >> 8) & 0xff ; + int bits = 0; + for (int index = 0; index <= 7; index++) { + if ((tmp & 0x01) == 0x01) { + bits |= 1 << (7 - index); + } + tmp >>>= 1; + } + return 1L << (bits & 0x3f); + } + + static void setActive(int tid, boolean active) { + long addr = Address.get(); + if (addr == -1) { + addr = bitmapAddressFor0(tid); + Address.set(addr); + } + long bitmask = getBitmask(tid); + long value = UNSAFE.getLong(addr); + long newVal = active ? (value | bitmask) : (value & ~bitmask); + + while (!UNSAFE.compareAndSwapLong(null, addr, value, newVal)) { + value = UNSAFE.getLong(addr); + newVal = active ? (value | bitmask) : (value & ~bitmask); + } + int delta = active ? 1 : -1; + assert activeCountAddr != 0; + UNSAFE.getAndAddInt(null, activeCountAddr, delta); + if (isActive0(tid) != active) { + throw new RuntimeException("SetActive Failed"); + } + + assert isActive0(tid) == active; + } + + + // Address of bitmap word that contains the active bit of this thread Id + private static native long bitmapAddressFor0(int tid); + + private static native long getActiveCountAddr0(); + + // For validation + private static native boolean isActive0(int tid); +} + diff --git a/ddprof-lib/src/main/java/com/datadoghq/profiler/JavaProfiler.java b/ddprof-lib/src/main/java/com/datadoghq/profiler/JavaProfiler.java index 4436273c6..851c4baae 100644 --- a/ddprof-lib/src/main/java/com/datadoghq/profiler/JavaProfiler.java +++ b/ddprof-lib/src/main/java/com/datadoghq/profiler/JavaProfiler.java @@ -34,17 +34,17 @@ * libjavaProfiler.so. */ public final class JavaProfiler { - private static final Unsafe UNSAFE; + static final Unsafe UNSAFE; + static final boolean isJDK8; static { Unsafe unsafe = null; String version = System.getProperty("java.version"); - if (version.startsWith("1.8")) { - try { - Field f = Unsafe.class.getDeclaredField("theUnsafe"); - f.setAccessible(true); - unsafe = (Unsafe) f.get(null); - } catch (Exception ignore) { } - } + isJDK8 = version.startsWith("1.8"); + try { + Field f = Unsafe.class.getDeclaredField("theUnsafe"); + f.setAccessible(true); + unsafe = (Unsafe) f.get(null); + } catch (Exception ignore) { } UNSAFE = unsafe; } @@ -108,6 +108,7 @@ public static synchronized JavaProfiler getInstance(String libLocation, String s throw new IOException("Failed to load Datadog Java profiler library", result.error); } init0(); + ActiveBitmap.initialize(); profiler.initializeContextStorage(); instance = profiler; @@ -128,7 +129,7 @@ private void initializeContextStorage() { if (this.contextStorage == null) { int maxPages = getMaxContextPages0(); if (maxPages > 0) { - if (UNSAFE != null) { + if (isJDK8) { contextBaseOffsets = new long[maxPages]; // be sure to choose an illegal address as a sentinel value Arrays.fill(contextBaseOffsets, Long.MIN_VALUE); @@ -208,7 +209,11 @@ public boolean recordTraceRoot(long rootSpanId, String endpoint, int sizeLimit) * 'filter' option must be enabled to use this method. */ public void addThread() { - filterThread0(true); + if (UNSAFE != null) { + ActiveBitmap.setActive(TID.get(), true); + } else { + filterThread0(true); + } } /** @@ -216,7 +221,11 @@ public void addThread() { * 'filter' option must be enabled to use this method. */ public void removeThread() { - filterThread0(false); + if (UNSAFE != null) { + ActiveBitmap.setActive(TID.get(), false); + } else { + filterThread0(false); + } } @@ -229,7 +238,7 @@ public void removeThread() { */ public void setContext(long spanId, long rootSpanId) { int tid = TID.get(); - if (UNSAFE != null) { + if (isJDK8) { setContextJDK8(tid, spanId, rootSpanId); } else { setContextByteBuffer(tid, spanId, rootSpanId); @@ -304,7 +313,7 @@ public void clearContext() { */ public void setContextValue(int offset, int value) { int tid = TID.get(); - if (UNSAFE != null) { + if (isJDK8) { setContextJDK8(tid, offset, value); } else { setContextByteBuffer(tid, offset, value); @@ -335,7 +344,7 @@ public void setContextByteBuffer(int tid, int offset, int value) { void copyTags(int[] snapshot) { int tid = TID.get(); - if (UNSAFE != null) { + if (isJDK8) { copyTagsJDK8(tid, snapshot); } else { copyTagsByteBuffer(tid, snapshot);