diff --git a/ddprof-lib/src/main/cpp/flightRecorder.cpp b/ddprof-lib/src/main/cpp/flightRecorder.cpp index ed6e55d0..1c8ee471 100644 --- a/ddprof-lib/src/main/cpp/flightRecorder.cpp +++ b/ddprof-lib/src/main/cpp/flightRecorder.cpp @@ -319,7 +319,7 @@ char *Recording::_jvm_flags = NULL; char *Recording::_java_command = NULL; Recording::Recording(int fd, Arguments &args) - : _fd(fd), _thread_set(), _method_map() { + : _fd(fd), _method_map() { args.save(_args); _chunk_start = lseek(_fd, 0, SEEK_END); @@ -331,6 +331,8 @@ Recording::Recording(int fd, Arguments &args) _bytes_written = 0; _tid = OS::threadId(); + _active_index.store(0, std::memory_order_relaxed); + VM::jvmti()->GetAvailableProcessors(&_available_processors); writeHeader(_buf); @@ -1060,11 +1062,18 @@ void Recording::writeExecutionModes(Buffer *buf) { } void Recording::writeThreads(Buffer *buf) { - addThread(_tid); - std::vector threads; - threads.reserve(_thread_set.size()); - _thread_set.collect(threads); - _thread_set.clear(); + int old_index = _active_index.fetch_xor(1, std::memory_order_acq_rel); + // After flip: new samples go into the new active set + // We flush from old_index (the previous active set) + + std::unordered_set threads; + threads.insert(_tid); + + for (int i = 0; i < CONCURRENCY_LEVEL; ++i) { + // Collect thread IDs from the fixed-size table into the main set + _thread_ids[i][old_index].collect(threads); + _thread_ids[i][old_index].clear(); + } Profiler *profiler = Profiler::instance(); ThreadInfo *t_info = &profiler->_thread_info; @@ -1073,15 +1082,15 @@ void Recording::writeThreads(Buffer *buf) { buf->putVar64(T_THREAD); buf->putVar64(threads.size()); - for (int i = 0; i < threads.size(); i++) { + for (auto tid : threads) { const char *thread_name; jlong thread_id; - std::pair, u64> info = t_info->get(threads[i]); + std::pair, u64> info = t_info->get(tid); if (info.first) { thread_name = info.first->c_str(); thread_id = info.second; } else { - snprintf(name_buf, sizeof(name_buf), "[tid=%d]", threads[i]); + snprintf(name_buf, sizeof(name_buf), "[tid=%d]", tid); thread_name = name_buf; thread_id = 0; } @@ -1091,9 +1100,9 @@ void Recording::writeThreads(Buffer *buf) { (thread_id == 0 ? length + 1 : 2 * length) - 3 * 10; // 3x max varint length flushIfNeeded(buf, required); - buf->putVar64(threads[i]); + buf->putVar64(tid); buf->putUtf8(thread_name, length); - buf->putVar64(threads[i]); + buf->putVar64(tid); if (thread_id == 0) { buf->put8(0); } else { @@ -1443,7 +1452,11 @@ void Recording::recordCpuLoad(Buffer *buf, float proc_user, float proc_system, flushIfNeeded(buf); } -void Recording::addThread(int tid) { _thread_set.add(tid); } +// assumption is that we hold the lock (with lock_index) +void Recording::addThread(int lock_index, int tid) { + int active = _active_index.load(std::memory_order_acquire); + _thread_ids[lock_index][active].insert(tid); +} Error FlightRecorder::start(Arguments &args, bool reset) { const char *file = args.file(); @@ -1600,7 +1613,7 @@ void FlightRecorder::recordEvent(int lock_index, int tid, u64 call_trace_id, break; } _rec->flushIfNeeded(buf); - _rec->addThread(tid); + _rec->addThread(lock_index, tid); } } diff --git a/ddprof-lib/src/main/cpp/flightRecorder.h b/ddprof-lib/src/main/cpp/flightRecorder.h index f46bf40f..717470fb 100644 --- a/ddprof-lib/src/main/cpp/flightRecorder.h +++ b/ddprof-lib/src/main/cpp/flightRecorder.h @@ -8,6 +8,7 @@ #define _FLIGHTRECORDER_H #include +#include #include #include @@ -24,6 +25,7 @@ #include "mutex.h" #include "objectSampler.h" #include "threadFilter.h" +#include "threadIdTable.h" #include "vmEntry.h" const u64 MAX_JLONG = 0x7fffffffffffffffULL; @@ -117,9 +119,13 @@ class Recording { static char *_java_command; RecordingBuffer _buf[CONCURRENCY_LEVEL]; + // we have several tables to avoid lock contention + // we have a second dimension to allow a switch in the active table + ThreadIdTable _thread_ids[CONCURRENCY_LEVEL][2]; + std::atomic _active_index{0}; // 0 or 1 globally + int _fd; off_t _chunk_start; - ThreadFilter _thread_set; MethodMap _method_map; Arguments _args; @@ -248,7 +254,8 @@ class Recording { LockEvent *event); void recordCpuLoad(Buffer *buf, float proc_user, float proc_system, float machine_total); - void addThread(int tid); + + void addThread(int lock_index, int tid); }; class Lookup { diff --git a/ddprof-lib/src/main/cpp/javaApi.cpp b/ddprof-lib/src/main/cpp/javaApi.cpp index 6033255d..d72a7e5d 100644 --- a/ddprof-lib/src/main/cpp/javaApi.cpp +++ b/ddprof-lib/src/main/cpp/javaApi.cpp @@ -16,6 +16,7 @@ #include +#include "arch_dd.h" #include "context.h" #include "counters.h" #include "engine.h" @@ -124,19 +125,103 @@ Java_com_datadoghq_profiler_JavaProfiler_getSamples(JNIEnv *env, return (jlong)Profiler::instance()->total_samples(); } +// some duplication between add and remove, though we want to avoid having an extra branch in the hot path +extern "C" DLLEXPORT void JNICALL +Java_com_datadoghq_profiler_JavaProfiler_filterThreadAdd0(JNIEnv *env, + jobject unused) { + ProfiledThread *current = ProfiledThread::current(); + if (unlikely(current == nullptr)) { + assert(false); + return; + } + int tid = current->tid(); + if (unlikely(tid < 0)) { + return; + } + ThreadFilter *thread_filter = Profiler::instance()->threadFilter(); + if (unlikely(!thread_filter->enabled())) { + return; + } + + int slot_id = current->filterSlotId(); + if (unlikely(slot_id == -1)) { + // Thread doesn't have a slot ID yet (e.g., main thread), so register it + // Happens when we are not enabled before thread start + slot_id = thread_filter->registerThread(); + current->setFilterSlotId(slot_id); + } + + if (unlikely(slot_id == -1)) { + return; // Failed to register thread + } + thread_filter->add(tid, slot_id); +} + +extern "C" DLLEXPORT void JNICALL +Java_com_datadoghq_profiler_JavaProfiler_filterThreadRemove0(JNIEnv *env, + jobject unused) { + ProfiledThread *current = ProfiledThread::current(); + if (unlikely(current == nullptr)) { + assert(false); + return; + } + int tid = current->tid(); + if (unlikely(tid < 0)) { + return; + } + ThreadFilter *thread_filter = Profiler::instance()->threadFilter(); + if (unlikely(!thread_filter->enabled())) { + return; + } + + int slot_id = current->filterSlotId(); + if (unlikely(slot_id == -1)) { + // Thread doesn't have a slot ID yet - nothing to remove + return; + } + thread_filter->remove(slot_id); +} + +// Backward compatibility for existing code extern "C" DLLEXPORT void JNICALL Java_com_datadoghq_profiler_JavaProfiler_filterThread0(JNIEnv *env, jobject unused, jboolean enable) { - int tid = ProfiledThread::currentTid(); - if (tid < 0) { + ProfiledThread *current = ProfiledThread::current(); + if (unlikely(current == nullptr)) { + assert(false); + return; + } + int tid = current->tid(); + if (unlikely(tid < 0)) { return; } ThreadFilter *thread_filter = Profiler::instance()->threadFilter(); + if (unlikely(!thread_filter->enabled())) { + return; + } + + int slot_id = current->filterSlotId(); + if (unlikely(slot_id == -1)) { + if (enable) { + // Thread doesn't have a slot ID yet, so register it + assert(thread_filter->enabled() && "ThreadFilter should be enabled when trying to register thread"); + slot_id = thread_filter->registerThread(); + current->setFilterSlotId(slot_id); + } else { + // Thread doesn't have a slot ID yet - nothing to remove + return; + } + } + + if (unlikely(slot_id == -1)) { + return; // Failed to register thread + } + if (enable) { - thread_filter->add(tid); + thread_filter->add(tid, slot_id); } else { - thread_filter->remove(tid); + thread_filter->remove(slot_id); } } @@ -408,27 +493,6 @@ Java_com_datadoghq_profiler_JVMAccess_healthCheck0(JNIEnv *env, return true; } -extern "C" DLLEXPORT jlong JNICALL -Java_com_datadoghq_profiler_ActiveBitmap_bitmapAddressFor0(JNIEnv *env, - jclass unused, - jint tid) { - u64* bitmap = Profiler::instance()->threadFilter()->bitmapAddressFor((int)tid); - return (jlong)bitmap; -} - -extern "C" DLLEXPORT jboolean JNICALL -Java_com_datadoghq_profiler_ActiveBitmap_isActive0(JNIEnv *env, - jclass unused, - jint tid) { - return Profiler::instance()->threadFilter()->accept((int)tid) ? JNI_TRUE : JNI_FALSE; -} - -extern "C" DLLEXPORT jlong JNICALL -Java_com_datadoghq_profiler_ActiveBitmap_getActiveCountAddr0(JNIEnv *env, - jclass unused) { - return (jlong)Profiler::instance()->threadFilter()->addressOfSize(); -} - // Static variable to track the current published context static otel_process_ctx_result* current_published_context = nullptr; diff --git a/ddprof-lib/src/main/cpp/profiler.cpp b/ddprof-lib/src/main/cpp/profiler.cpp index f0bbcc5f..e51b3dc7 100644 --- a/ddprof-lib/src/main/cpp/profiler.cpp +++ b/ddprof-lib/src/main/cpp/profiler.cpp @@ -104,10 +104,12 @@ void Profiler::addRuntimeStub(const void *address, int length, void Profiler::onThreadStart(jvmtiEnv *jvmti, JNIEnv *jni, jthread thread) { ProfiledThread::initCurrentThread(); - - int tid = ProfiledThread::currentTid(); + ProfiledThread *current = ProfiledThread::current(); + int tid = current->tid(); if (_thread_filter.enabled()) { - _thread_filter.remove(tid); + int slot_id = _thread_filter.registerThread(); + current->setFilterSlotId(slot_id); + _thread_filter.remove(slot_id); // Remove from filtering initially } updateThreadName(jvmti, jni, thread, true); @@ -116,16 +118,33 @@ void Profiler::onThreadStart(jvmtiEnv *jvmti, JNIEnv *jni, jthread thread) { } void Profiler::onThreadEnd(jvmtiEnv *jvmti, JNIEnv *jni, jthread thread) { - int tid = ProfiledThread::currentTid(); - if (_thread_filter.enabled()) { - _thread_filter.remove(tid); + ProfiledThread *current = ProfiledThread::current(); + int tid = -1; + + if (current != nullptr) { + // ProfiledThread is alive - do full cleanup and use efficient tid access + int slot_id = current->filterSlotId(); + tid = current->tid(); + + if (_thread_filter.enabled()) { + _thread_filter.unregisterThread(slot_id); + current->setFilterSlotId(-1); + } + + ProfiledThread::release(); + } else { + // ProfiledThread already cleaned up - try to get tid from JVMTI as fallback + tid = VMThread::nativeThreadId(jni, thread); + if (tid < 0) { + // No ProfiledThread AND can't get tid from JVMTI - nothing we can do + return; + } } - updateThreadName(jvmti, jni, thread, true); - + + // These can run if we have a valid tid + updateThreadName(jvmti, jni, thread, false); // false = not self _cpu_engine->unregisterThread(tid); - // unregister here because JNI callers generally don't know about thread exits _wall_engine->unregisterThread(tid); - ProfiledThread::release(); } int Profiler::registerThread(int tid) { @@ -1152,6 +1171,16 @@ Error Profiler::start(Arguments &args, bool reset) { } _thread_filter.init(args._filter); + + // Minor optim: Register the current thread (start thread won't be called) + if (_thread_filter.enabled()) { + ProfiledThread *current = ProfiledThread::current(); + if (current != nullptr) { + int slot_id = _thread_filter.registerThread(); + current->setFilterSlotId(slot_id); + _thread_filter.remove(slot_id); // Remove from filtering initially (matches onThreadStart behavior) + } + } _cpu_engine = selectCpuEngine(args); _wall_engine = selectWallEngine(args); diff --git a/ddprof-lib/src/main/cpp/thread.h b/ddprof-lib/src/main/cpp/thread.h index 7942b972..9b5dad1c 100644 --- a/ddprof-lib/src/main/cpp/thread.h +++ b/ddprof-lib/src/main/cpp/thread.h @@ -48,11 +48,12 @@ class ProfiledThread : public ThreadLocalData { u32 _wall_epoch; u64 _call_trace_id; u32 _recording_epoch; + int _filter_slot_id; // Slot ID for thread filtering UnwindFailures _unwind_failures; ProfiledThread(int buffer_pos, int tid) : ThreadLocalData(), _pc(0), _span_id(0), _crash_depth(0), _buffer_pos(buffer_pos), _tid(tid), _cpu_epoch(0), - _wall_epoch(0), _call_trace_id(0), _recording_epoch(0) {}; + _wall_epoch(0), _call_trace_id(0), _recording_epoch(0), _filter_slot_id(-1) {}; void releaseFromBuffer(); @@ -125,6 +126,9 @@ class ProfiledThread : public ThreadLocalData { } static void signalHandler(int signo, siginfo_t *siginfo, void *ucontext); + + int filterSlotId() { return _filter_slot_id; } + void setFilterSlotId(int slotId) { _filter_slot_id = slotId; } }; #endif // _THREAD_H diff --git a/ddprof-lib/src/main/cpp/threadFilter.cpp b/ddprof-lib/src/main/cpp/threadFilter.cpp index 13e6c2ae..596af4e0 100644 --- a/ddprof-lib/src/main/cpp/threadFilter.cpp +++ b/ddprof-lib/src/main/cpp/threadFilter.cpp @@ -1,5 +1,5 @@ /* - * Copyright 2020 Andrei Pangin + * Copyright 2025 Datadog, Inc * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,163 +13,286 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +// High-performance lock-free thread filter implementation +// +// PERFORMANCE CONTRACT: +// - add(), remove(), accept() are optimized for signal-safe hot paths +// - These methods assume slot_id comes from registerThread() (undefined behavior otherwise) #include "threadFilter.h" -#include "counters.h" +#include "arch_dd.h" #include "os.h" -#include "reverse_bits.h" + #include -#include -#include +#include +#include +#include +#include +#include -void trackPage() { - Counters::increment(THREAD_FILTER_PAGES, 1); - Counters::increment(THREAD_FILTER_BYTES, BITMAP_SIZE); -} +ThreadFilter::ShardHead ThreadFilter::_free_heads[ThreadFilter::kShardCount] {}; + +ThreadFilter::ThreadFilter() : _enabled(false) { + // Initialize chunk pointers to null (lazy allocation) + for (int i = 0; i < kMaxChunks; ++i) { + _chunks[i].store(nullptr, std::memory_order_relaxed); + } + _free_list = std::make_unique(kFreeListSize); -ThreadFilter::ThreadFilter() { - _max_thread_id = OS::getMaxThreadId(128 * 1024); - _max_bitmaps = (_max_thread_id + BITMAP_SIZE - 1) / BITMAP_SIZE; - u32 capacity = _max_bitmaps * sizeof(u64 *); - _bitmap = (u64 **)OS::safeAlloc(capacity); - memset(_bitmap, 0, capacity); - _bitmap[0] = (u64 *)OS::safeAlloc(BITMAP_SIZE); - trackPage(); - _enabled = false; - _size = 0; + // Initialize the first chunk + initializeChunk(0); + // ordering is fine because we are not enabled yet + initFreeList(); } +// This function is not called as we keep the profiler alive +// Memory orders should be adjusted if we want to unload the profiler +// This could have a perf impact on reading chunk variables. ThreadFilter::~ThreadFilter() { - for (int i = 0; i < _max_bitmaps; i++) { - if (_bitmap[i] != NULL) { - OS::safeFree(_bitmap[i], BITMAP_SIZE); - } - } - if (_bitmap) { - OS::safeFree(_bitmap, _max_bitmaps * sizeof(u64 *)); - } + // Make the filter inert for any concurrent readers + _enabled.store(false, std::memory_order_release); + // Reset free-list heads and nodes first + for (int s = 0; s < kShardCount; ++s) { + _free_heads[s].head.store(-1, std::memory_order_relaxed); + } + for (int i = 0; i < kFreeListSize; ++i) { + _free_list[i].value.store(-1, std::memory_order_relaxed); + _free_list[i].next.store(-1, std::memory_order_relaxed); + } + // Publish 0 chunks to stop range scans (collect) + _num_chunks.store(0, std::memory_order_release); + // Detach and delete chunks + for (int i = 0; i < kMaxChunks; ++i) { + ChunkStorage* chunk = _chunks[i].exchange(nullptr, std::memory_order_acquire); + delete chunk; + } } -void ThreadFilter::init(const char *filter) { - if (filter == NULL) { - _enabled = false; - return; - } +void ThreadFilter::initializeChunk(int chunk_idx) { + if (chunk_idx >= kMaxChunks) return; + + // Check if chunk already exists + ChunkStorage* existing = _chunks[chunk_idx].load(std::memory_order_acquire); + if (existing != nullptr) return; - char *end; - do { - int id = strtol(filter, &end, 0); - if (id <= 0) { - break; + // Allocate and initialize new chunk completely before swapping + ChunkStorage* new_chunk = new ChunkStorage(); + for (auto& slot : new_chunk->slots) { + slot.value.store(-1, std::memory_order_relaxed); } - if (*end == '-') { - int to = strtol(end + 1, &end, 0); - while (id <= to) { - add(id++); - } + // Try to install it atomically + ChunkStorage* expected = nullptr; + if (_chunks[chunk_idx].compare_exchange_strong(expected, new_chunk, std::memory_order_release)) { + // Successfully installed } else { - add(id); + // Another thread beat us to it - clean up our allocation + delete new_chunk; + } +} + +ThreadFilter::SlotID ThreadFilter::registerThread() { + // If disabled, block new registrations + if (!_enabled.load(std::memory_order_acquire)) { + return -1; + } + + // First, try to get a slot from the free list (lock-free stack) + SlotID reused_slot = popFromFreeList(); + if (reused_slot >= 0) { + return reused_slot; + } + + // Allocate a new slot + SlotID index = _next_index.fetch_add(1, std::memory_order_relaxed); + if (index >= kMaxThreads) { + // Revert the increment and return failure + _next_index.fetch_sub(1, std::memory_order_relaxed); + return -1; + } + + const int chunk_idx = index >> kChunkShift; + + // Ensure the chunk is initialized (lock-free) + if (chunk_idx >= _num_chunks.load(std::memory_order_acquire)) { + // Update the chunk count atomically + int expected_chunks = chunk_idx; + int desired_chunks = chunk_idx + 1; + while (!_num_chunks.compare_exchange_weak(expected_chunks, desired_chunks, + std::memory_order_acq_rel)) { + if (expected_chunks > chunk_idx) { + break; // Another thread already updated it + } + desired_chunks = expected_chunks + 1; + } } - filter = end + 1; - } while (*end); + // Initialize the chunk if needed + initializeChunk(chunk_idx); - _enabled = true; + return index; } -void ThreadFilter::clear() { - for (int i = 0; i < _max_bitmaps; i++) { - if (_bitmap[i] != NULL) { - memset(_bitmap[i], 0, BITMAP_SIZE); +void ThreadFilter::initFreeList() { + // Initialize the free list storage + for (int i = 0; i < kFreeListSize; ++i) { + _free_list[i].value.store(-1, std::memory_order_relaxed); + _free_list[i].next.store(-1, std::memory_order_relaxed); + } + + // Reset the free heads for each shard + for (int s = 0; s < kShardCount; ++s) { + _free_heads[s].head.store(-1, std::memory_order_relaxed); } - } - _size = 0; } -// The mapping has to be reversible: f(f(x)) == x -int ThreadFilter::mapThreadId(int thread_id) { - // We want to map the thread_id inside the same bitmap - static_assert(BITMAP_SIZE >= (u16)0xffff, "Potential verflow"); - u16 lower16 = (u16)(thread_id & 0xffff); - lower16 = reverse16(lower16); - int tid = (thread_id & ~0xffff) | lower16; - return tid; +bool ThreadFilter::accept(SlotID slot_id) const { + // Fast path: if disabled, accept everything (relaxed to avoid fences on hot path) + if (unlikely(!_enabled.load(std::memory_order_relaxed))) { + return true; + } + if (unlikely(slot_id < 0)) return false; + + int chunk_idx = slot_id >> kChunkShift; + int slot_idx = slot_id & kChunkMask; + + // This is not a fast path like the add operation. + ChunkStorage* chunk = _chunks[chunk_idx].load(std::memory_order_acquire); + if (likely(chunk != nullptr)) { + return chunk->slots[slot_idx].value.load(std::memory_order_relaxed) != -1; + } + return false; } -// Get bitmap that contains the thread id, create one if it does not exist -u64* ThreadFilter::getBitmapFor(int thread_id) { - int index = thread_id / BITMAP_CAPACITY; - assert(index >= 0 && index < (int)_max_bitmaps); - u64* b = _bitmap[index]; - if (b == NULL) { - b = (u64 *)OS::safeAlloc(BITMAP_SIZE); - u64 *oldb = __sync_val_compare_and_swap( - &_bitmap[index], NULL, b); - if (oldb != NULL) { - OS::safeFree(b, BITMAP_SIZE); - b = oldb; - } else { - trackPage(); +void ThreadFilter::add(int tid, SlotID slot_id) { + // PRECONDITION: slot_id must be from registerThread() or negative + // Undefined behavior for invalid positive slot_ids (performance optimization) + if (slot_id < 0) return; + + int chunk_idx = slot_id >> kChunkShift; + int slot_idx = slot_id & kChunkMask; + + // Fast path: assume valid slot_id from registerThread() + ChunkStorage* chunk = _chunks[chunk_idx].load(std::memory_order_acquire); + if (likely(chunk != nullptr)) { + chunk->slots[slot_idx].value.store(tid, std::memory_order_release); } - } - return b; } -u64* ThreadFilter::bitmapAddressFor(int thread_id) { - u64* b = getBitmapFor(thread_id); - thread_id = mapThreadId(thread_id); - assert(b == bitmap(thread_id)); - return wordAddress(b, thread_id); +void ThreadFilter::remove(SlotID slot_id) { + // PRECONDITION: slot_id must be from registerThread() or negative + // Undefined behavior for invalid positive slot_ids (performance optimization) + if (slot_id < 0) return; + + int chunk_idx = slot_id >> kChunkShift; + int slot_idx = slot_id & kChunkMask; + + if (unlikely(chunk_idx >= kMaxChunks)) { + assert(false && "Invalid slot_id in ThreadFilter::remove - should not happen after wall clock fix"); + return; + } + + ChunkStorage* chunk = _chunks[chunk_idx].load(std::memory_order_acquire); + if (unlikely(chunk == nullptr)) { + return; + } + + chunk->slots[slot_idx].value.store(-1, std::memory_order_release); } -bool ThreadFilter::accept(int thread_id) { - thread_id = mapThreadId(thread_id); - u64 *b = bitmap(thread_id); - return b != NULL && (word(b, thread_id) & (1ULL << (thread_id & 0x3f))); +void ThreadFilter::unregisterThread(SlotID slot_id) { + if (slot_id < 0) return; + remove(slot_id); + pushToFreeList(slot_id); } -void ThreadFilter::add(int thread_id) { - u64 *b = getBitmapFor(thread_id); - assert (b != NULL); - thread_id = mapThreadId(thread_id); - assert(b == bitmap(thread_id)); - u64 bit = 1ULL << (thread_id & 0x3f); - if (!(__atomic_fetch_or(&word(b, thread_id), bit, __ATOMIC_RELAXED) & bit)) { - atomicInc(_size); - } +bool ThreadFilter::pushToFreeList(SlotID slot_id) { + // Lock-free sharded Treiber stack push + const int shard = shardOfSlot(slot_id); + auto& head = _free_heads[shard].head; // private cache-line + + for (int i = 0; i < kFreeListSize; ++i) { + int expected = -1; + if (_free_list[i].value.compare_exchange_strong( + expected, slot_id, std::memory_order_acq_rel)) { + // Link node into this shard’s Treiber stack + int old_head; + do { + old_head = head.load(std::memory_order_acquire); + _free_list[i].next.store(old_head, std::memory_order_relaxed); + } while (!head.compare_exchange_weak(old_head, i, + std::memory_order_release, std::memory_order_relaxed)); + return true; + } + } + return false; // Free list full, slot is lost but this is rare } -void ThreadFilter::remove(int thread_id) { - thread_id = mapThreadId(thread_id); - u64 *b = bitmap(thread_id); - if (b == NULL) { - return; - } - - u64 bit = 1ULL << (thread_id & 0x3f); - if (__atomic_fetch_and(&word(b, thread_id), ~bit, __ATOMIC_RELAXED) & bit) { - atomicInc(_size, -1); - } +ThreadFilter::SlotID ThreadFilter::popFromFreeList() { + // Lock-free sharded Treiber stack pop + int hash = static_cast(std::hash{}(std::this_thread::get_id())); + int start = shardOf(hash); + + for (int pass = 0; pass < kShardCount; ++pass) { + int s = (start + pass) & (kShardCount - 1); + auto& head = _free_heads[s].head; + + while (true) { + int node = head.load(std::memory_order_acquire); + if (node == -1) break; // shard empty → try next + + int next = _free_list[node].next.load(std::memory_order_relaxed); + if (head.compare_exchange_weak(node, next, + std::memory_order_release, + std::memory_order_relaxed)) + { + int id = _free_list[node].value.exchange(-1, + std::memory_order_relaxed); + _free_list[node].next.store(-1, std::memory_order_relaxed); + return id; + } + } + } + return -1; // Empty list } -void ThreadFilter::collect(std::vector &v) { - for (int i = 0; i < _max_bitmaps; i++) { - u64 *b = _bitmap[i]; - if (b != NULL) { - int start_id = i * BITMAP_CAPACITY; - for (int j = 0; j < BITMAP_SIZE / sizeof(u64); j++) { - // Considering the functional impact, relaxed could be a reasonable - // order here - u64 word = __atomic_load_n(&b[j], __ATOMIC_ACQUIRE); - while (word != 0) { - int tid = start_id + j * 64 + __builtin_ctzl(word); - // restore thread id - tid = mapThreadId(tid); - v.push_back(tid); - word &= (word - 1); +void ThreadFilter::collect(std::vector& tids) const { + tids.clear(); + + // Reserve space for efficiency + // The eventual resize is not the bottleneck, so we reserve a reasonable size + tids.reserve(512); + + // Scan only initialized chunks + int num_chunks = _num_chunks.load(std::memory_order_relaxed); + for (int chunk_idx = 0; chunk_idx < num_chunks; ++chunk_idx) { + ChunkStorage* chunk = _chunks[chunk_idx].load(std::memory_order_acquire); + if (chunk == nullptr) { + continue; // Skip unallocated chunks + } + + for (const auto& slot : chunk->slots) { + int slot_tid = slot.value.load(std::memory_order_relaxed); + if (slot_tid != -1) { + tids.push_back(slot_tid); + } } - } } - } + + // Optional: shrink if we over-reserved significantly + if (tids.capacity() > tids.size() * 2) { + tids.shrink_to_fit(); + } +} + +void ThreadFilter::init(const char* filter) { + // Simple logic: any filter value (including "0") enables filtering + // Only explicitly registered threads via addThread() will be sampled + // Previously we had a syntax where we could manually force some thread IDs. + // This is no longer supported. + _enabled.store(filter != nullptr, std::memory_order_release); +} + +bool ThreadFilter::enabled() const { + return _enabled.load(std::memory_order_acquire); } diff --git a/ddprof-lib/src/main/cpp/threadFilter.h b/ddprof-lib/src/main/cpp/threadFilter.h index 7454be57..a9a0e0b4 100644 --- a/ddprof-lib/src/main/cpp/threadFilter.h +++ b/ddprof-lib/src/main/cpp/threadFilter.h @@ -1,5 +1,5 @@ /* - * Copyright 2020 Andrei Pangin + * Copyright 2025 Datadog, Inc * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -13,73 +13,83 @@ * See the License for the specific language governing permissions and * limitations under the License. */ - #ifndef _THREADFILTER_H #define _THREADFILTER_H -#include "arch_dd.h" +#include +#include #include +#include +#include -// The size of thread ID bitmap in bytes. Must be at least 64K to allow mmap() -const u32 BITMAP_SIZE = 65536; -// How many thread IDs one bitmap can hold -const u32 BITMAP_CAPACITY = BITMAP_SIZE * 8; +#include "arch_dd.h" -// ThreadFilter query operations must be lock-free and signal-safe; -// update operations are mostly lock-free, except rare bitmap allocations class ThreadFilter { -private: - // Total number of bitmaps required to hold the entire range of thread IDs - u32 _max_thread_id; - u32 _max_bitmaps; - u64 **_bitmap; - bool _enabled; - volatile int _size; - - u64 *bitmap(int thread_id) { - if (thread_id >= _max_thread_id) { - return NULL; - } - return __atomic_load_n( - &(_bitmap[static_cast(thread_id) / BITMAP_CAPACITY]), - __ATOMIC_ACQUIRE); - } - - static int mapThreadId(int thread_id); - - u64 &word(u64 *bitmap, int thread_id) { - // todo: add thread safe APIs - return bitmap[((u32)thread_id % BITMAP_CAPACITY) >> 6]; - } - - u64* const wordAddress(u64 *bitmap, int thread_id) const { - return &bitmap[((u32)thread_id % BITMAP_CAPACITY) >> 6]; - } - - u64* getBitmapFor(int thread_id); public: - ThreadFilter(); - ThreadFilter(ThreadFilter &threadFilter) = delete; - ~ThreadFilter(); - - bool enabled() const { return _enabled; } - - int size() const { return _size; } - const volatile int* addressOfSize() const { return &_size; } - - void init(const char *filter); - void clear(); + using SlotID = int; + + // Optimized limits for reasonable memory usage + static constexpr int kChunkSize = 256; + static constexpr int kChunkShift = 8; // log2(256) + static constexpr int kChunkMask = kChunkSize - 1; + static constexpr int kMaxThreads = 2048; + static constexpr int kMaxChunks = (kMaxThreads + kChunkSize - 1) / kChunkSize; // = 8 chunks + // High-performance free list using Treiber stack, 64 shards + static constexpr int kFreeListSize = 1024; // power-of-two for fast modulo + static constexpr int kShardCount = 64; // power-of-two for fast modulo + ThreadFilter(); + ~ThreadFilter(); + + void init(const char* filter); + void initFreeList(); + bool enabled() const; + // Hot path methods - slot_id MUST be from registerThread(), undefined behavior otherwise + bool accept(SlotID slot_id) const; + void add(int tid, SlotID slot_id); + void remove(SlotID slot_id); + void collect(std::vector& tids) const; + + SlotID registerThread(); + void unregisterThread(SlotID slot_id); - inline bool isValid(int thread_id) { - return thread_id >= 0 && thread_id < _max_thread_id; - } - - bool accept(int thread_id); - void add(int thread_id); - void remove(int thread_id); - u64* bitmapAddressFor(int thread_id); - - void collect(std::vector &v); +private: + // Optimized slot structure with padding to avoid false sharing + struct alignas(DEFAULT_CACHE_LINE_SIZE) Slot { + std::atomic value{-1}; + char padding[DEFAULT_CACHE_LINE_SIZE - sizeof(value)]; // Pad to cache line size + }; + static_assert(sizeof(Slot) == DEFAULT_CACHE_LINE_SIZE, "Slot must be exactly one cache line"); + + // Lock-free free list using a stack-like structure + struct FreeListNode { + std::atomic value{-1}; + std::atomic next{-1}; + }; + + // Pre-allocated chunk storage to eliminate mutex contention + struct ChunkStorage { + std::array slots; + }; + + std::atomic _enabled{false}; + + // Lazily allocated storage for chunks + std::atomic _chunks[kMaxChunks]; + std::atomic _num_chunks{1}; + + // Lock-free slot allocation + std::atomic _next_index{0}; + std::unique_ptr _free_list; + + struct alignas(DEFAULT_CACHE_LINE_SIZE) ShardHead { std::atomic head{-1}; }; + static ShardHead _free_heads[kShardCount]; // one cache-line each + + static inline int shardOf(int tid) { return tid & (kShardCount - 1); } + static inline int shardOfSlot(int s){ return s & (kShardCount - 1); } + // Helper methods for lock-free operations + void initializeChunk(int chunk_idx); + bool pushToFreeList(SlotID slot_id); + SlotID popFromFreeList(); }; #endif // _THREADFILTER_H diff --git a/ddprof-lib/src/main/cpp/threadIdTable.h b/ddprof-lib/src/main/cpp/threadIdTable.h new file mode 100644 index 00000000..f7d3b58e --- /dev/null +++ b/ddprof-lib/src/main/cpp/threadIdTable.h @@ -0,0 +1,87 @@ +/* + * Copyright 2025 Datadog, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _THREADIDTABLE_H +#define _THREADIDTABLE_H + +#include +#include +#include "arch_dd.h" + +// Simple fixed size thread ID table +class ThreadIdTable { +private: + // We have 256 slots per concurrency level (currently 16) + // This should cater for 4096 threads - if it turns out to be too small, we + // can increase it or make it configurable + static const int TABLE_SIZE = 256; // power of 2 + static const int TABLE_MASK = TABLE_SIZE - 1; // For fast bit masking + std::atomic table[TABLE_SIZE]; + + int hash(int tid) const { + // Improved hash function with bit mixing to reduce clustering + unsigned int utid = static_cast(tid); + utid ^= utid >> 16; // Mix high and low bits + return utid & TABLE_MASK; // Fast bit masking instead of modulo + } + +public: + ThreadIdTable() { + clear(); + } + + // Signal-safe insertion using atomic operations only + void insert(int tid) { + if (unlikely(tid == 0)) return; // Invalid thread ID, 0 is reserved for empty slots + + int start_slot = hash(tid); + for (int probe = 0; probe < TABLE_SIZE; probe++) { + int slot = (start_slot + probe) & TABLE_MASK; // Fast bit masking + int expected = 0; + + // Try to claim empty slot + if (table[slot].compare_exchange_strong(expected, tid, std::memory_order_relaxed)) { + return; // Successfully inserted + } + + // Check if already present (common case - threads insert multiple times) + if (likely(table[slot].load(std::memory_order_relaxed) == tid)) { + return; // Already exists + } + } + // Table full - thread ID will be lost, but this is rare and non-critical + // Could increment a counter here for diagnostics if needed + } + + // Clear the table (not signal-safe, called during buffer switch) + void clear() { + for (int i = 0; i < TABLE_SIZE; i++) { + table[i].store(0, std::memory_order_relaxed); + } + } + + // Collect all thread IDs into a set (not signal-safe, called during buffer switch) + void collect(std::unordered_set& result) { + for (int i = 0; i < TABLE_SIZE; i++) { + int tid = table[i].load(std::memory_order_relaxed); + if (tid != 0) { + result.insert(tid); + } + } + } +}; + +#endif // _THREADIDTABLE_H \ No newline at end of file diff --git a/ddprof-lib/src/main/cpp/wallClock.cpp b/ddprof-lib/src/main/cpp/wallClock.cpp index 7f7e0213..abbd5110 100644 --- a/ddprof-lib/src/main/cpp/wallClock.cpp +++ b/ddprof-lib/src/main/cpp/wallClock.cpp @@ -16,6 +16,7 @@ #include "vmStructs_dd.h" #include #include +#include // For std::sort and std::binary_search std::atomic BaseWallClock::_enabled{false}; @@ -186,6 +187,14 @@ void WallClockJVMTI::timerLoop() { bool do_filter = threadFilter->enabled(); int self = OS::threadId(); + // If filtering is enabled, collect the filtered TIDs first + std::vector filtered_tids; + if (do_filter) { + Profiler::instance()->threadFilter()->collect(filtered_tids); + // Sort the TIDs for efficient lookup + std::sort(filtered_tids.begin(), filtered_tids.end()); + } + for (int i = 0; i < threads_count; i++) { jthread thread = threads_ptr[i]; if (thread != nullptr) { @@ -194,12 +203,10 @@ void WallClockJVMTI::timerLoop() { continue; } int tid = nThread->osThreadId(); - if (!threadFilter->isValid(tid)) { - continue; - } - - if (tid != self && (!do_filter || threadFilter->accept(tid))) { - threads.push_back({nThread, thread, tid}); + if (tid != self && (!do_filter || + // Use binary search to efficiently find if tid is in filtered_tids + std::binary_search(filtered_tids.begin(), filtered_tids.end(), tid))) { + threads.push_back({nThread, thread}); } } } @@ -254,13 +261,17 @@ void WallClockJVMTI::timerLoop() { } void WallClockASGCT::timerLoop() { + // todo: re-allocating the vector every time is not efficient auto collectThreads = [&](std::vector& tids) { + // Get thread IDs from the filter if it's enabled + // Otherwise list all threads in the system if (Profiler::instance()->threadFilter()->enabled()) { Profiler::instance()->threadFilter()->collect(tids); } else { ThreadList *thread_list = OS::listThreads(); int tid = thread_list->next(); while (tid != -1) { + // Don't include the current thread if (tid != OS::threadId()) { tids.push_back(tid); } diff --git a/ddprof-lib/src/main/cpp/wallClock.h b/ddprof-lib/src/main/cpp/wallClock.h index e6af949e..17657625 100644 --- a/ddprof-lib/src/main/cpp/wallClock.h +++ b/ddprof-lib/src/main/cpp/wallClock.h @@ -11,6 +11,7 @@ #include "os.h" #include "profiler.h" #include "reservoirSampler.h" +#include "thread.h" #include "threadFilter.h" #include "threadState.h" #include "tsc.h" @@ -57,7 +58,15 @@ class BaseWallClock : public Engine { threads.reserve(reservoirSize); int self = OS::threadId(); ThreadFilter* thread_filter = Profiler::instance()->threadFilter(); - thread_filter->remove(self); + + // We don't want to profile ourselves in wall time + ProfiledThread* current = ProfiledThread::current(); + if (current != nullptr) { + int slot_id = current->filterSlotId(); + if (slot_id != -1) { + thread_filter->remove(slot_id); + } + } u64 startTime = TSC::ticks(); WallClockEpochEvent epoch(startTime); diff --git a/ddprof-lib/src/main/java/com/datadoghq/profiler/ActiveBitmap.java b/ddprof-lib/src/main/java/com/datadoghq/profiler/ActiveBitmap.java deleted file mode 100644 index 046573dd..00000000 --- a/ddprof-lib/src/main/java/com/datadoghq/profiler/ActiveBitmap.java +++ /dev/null @@ -1,67 +0,0 @@ -package com.datadoghq.profiler; - -import sun.misc.Unsafe; -import java.lang.reflect.Field; - - -class ActiveBitmap { - private static final Unsafe UNSAFE = JavaProfiler.UNSAFE; - - // Address to size field of ThreadFilter in native - private static long activeCountAddr; - - private static final ThreadLocal Address = new ThreadLocal() { - @Override protected Long initialValue() { - return -1L; - } - }; - - public static void initialize() { - activeCountAddr = getActiveCountAddr0(); - } - - // On native side, we reverse lower 16 bits of thread id when maps to bitmap bit. - // So the active bit position of the specific thread id maps to the reverse order - // of the second lowest byte of thread id. - static long getBitmask(int tid) { - int tmp = (tid >> 8) & 0xff ; - int bits = 0; - for (int index = 0; index <= 7; index++) { - if ((tmp & 0x01) == 0x01) { - bits |= 1 << (7 - index); - } - tmp >>>= 1; - } - return 1L << (bits & 0x3f); - } - - static void setActive(int tid, boolean active) { - long addr = Address.get(); - if (addr == -1) { - addr = bitmapAddressFor0(tid); - Address.set(addr); - } - long bitmask = getBitmask(tid); - long value = UNSAFE.getLong(addr); - long newVal = active ? (value | bitmask) : (value & ~bitmask); - - while (!UNSAFE.compareAndSwapLong(null, addr, value, newVal)) { - value = UNSAFE.getLong(addr); - newVal = active ? (value | bitmask) : (value & ~bitmask); - } - int delta = active ? 1 : -1; - assert activeCountAddr != 0; - UNSAFE.getAndAddInt(null, activeCountAddr, delta); - assert isActive0(tid) == active; - } - - - // Address of bitmap word that contains the active bit of this thread Id - private static native long bitmapAddressFor0(int tid); - - private static native long getActiveCountAddr0(); - - // For validation - private static native boolean isActive0(int tid); -} - diff --git a/ddprof-lib/src/main/java/com/datadoghq/profiler/JavaProfiler.java b/ddprof-lib/src/main/java/com/datadoghq/profiler/JavaProfiler.java index a7cda6bc..fbc96555 100644 --- a/ddprof-lib/src/main/java/com/datadoghq/profiler/JavaProfiler.java +++ b/ddprof-lib/src/main/java/com/datadoghq/profiler/JavaProfiler.java @@ -109,7 +109,6 @@ public static synchronized JavaProfiler getInstance(String libLocation, String s throw new IOException("Failed to load Datadog Java profiler library", result.error); } init0(); - ActiveBitmap.initialize(); profiler.initializeContextStorage(); instance = profiler; @@ -210,11 +209,7 @@ public boolean recordTraceRoot(long rootSpanId, String endpoint, int sizeLimit) * 'filter' option must be enabled to use this method. */ public void addThread() { - if (UNSAFE != null) { - ActiveBitmap.setActive(TID.get(), true); - } else { - filterThread0(true); - } + filterThreadAdd0(); } /** @@ -222,14 +217,9 @@ public void addThread() { * 'filter' option must be enabled to use this method. */ public void removeThread() { - if (UNSAFE != null) { - ActiveBitmap.setActive(TID.get(), false); - } else { - filterThread0(false); - } + filterThreadRemove0(); } - /** * Passing context identifier to a profiler. This ID is thread-local and is dumped in * the JFR output only. 0 is a reserved value for "no-context". @@ -479,6 +469,10 @@ public Map getDebugCounters() { private static native boolean init0(); private native void stop0() throws IllegalStateException; private native String execute0(String command) throws IllegalArgumentException, IllegalStateException, IOException; + + private native void filterThreadAdd0(); + private native void filterThreadRemove0(); + // Backward compatibility for existing code private native void filterThread0(boolean enable); private static native int getTid0(); diff --git a/ddprof-lib/src/test/cpp/ddprof_ut.cpp b/ddprof-lib/src/test/cpp/ddprof_ut.cpp index a754c9b0..e8e3a4e4 100644 --- a/ddprof-lib/src/test/cpp/ddprof_ut.cpp +++ b/ddprof-lib/src/test/cpp/ddprof_ut.cpp @@ -14,6 +14,9 @@ #include #include #include + #include // For std::sort + #include + #include ssize_t callback(char* ptr, int len) { return len; @@ -120,34 +123,6 @@ EXPECT_EQ(2048, Contexts::getMaxPages(2097152)); } - TEST(ThreadFilter, testThreadFilter) { - int maxTid = OS::getMaxThreadId(); - ThreadFilter filter; - filter.init(""); - ASSERT_TRUE(filter.enabled()); - EXPECT_EQ(0, filter.size()); - // increase step gradually to create different bit densities - int step = 1; - int size = 0; - for (int tid = 1; tid < maxTid - step - 1; tid += step, size++) { - EXPECT_FALSE(filter.accept(tid)); - filter.add(tid); - EXPECT_TRUE(filter.accept(tid)); - step++; - } - ASSERT_EQ(size, filter.size()); - std::vector tids; - tids.reserve(size); - filter.collect(tids); - ASSERT_EQ(size, tids.size()); - for (int tid : tids) { - ASSERT_TRUE(filter.accept(tid)); - filter.remove(tid); - ASSERT_FALSE(filter.accept(tid)); - } - EXPECT_EQ(0, filter.size()); - } - TEST(ThreadInfoTest, testThreadInfoCleanupAllDead) { ThreadInfo info; info.set(1, "main", 1); diff --git a/ddprof-lib/src/test/cpp/threadFilter_ut.cpp b/ddprof-lib/src/test/cpp/threadFilter_ut.cpp new file mode 100644 index 00000000..8cbeec99 --- /dev/null +++ b/ddprof-lib/src/test/cpp/threadFilter_ut.cpp @@ -0,0 +1,451 @@ +/* + * Copyright 2025 Datadog, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include "threadFilter.h" +#include +#include +#include +#include +#include +#include + +class ThreadFilterTest : public ::testing::Test { +protected: + void SetUp() override { + filter = std::make_unique(); + filter->init(""); // Enable filtering + } + + void TearDown() override { + filter.reset(); + } + + std::unique_ptr filter; +}; + +// Basic functionality tests +TEST_F(ThreadFilterTest, BasicRegisterAndAccept) { + EXPECT_TRUE(filter->enabled()); + + int slot_id = filter->registerThread(); + EXPECT_GE(slot_id, 0); + + // Initially should not accept (no tid added) + EXPECT_FALSE(filter->accept(slot_id)); + + // Add tid and test accept + filter->add(1234, slot_id); + EXPECT_TRUE(filter->accept(slot_id)); + + // Remove and test + filter->remove(slot_id); + EXPECT_FALSE(filter->accept(slot_id)); +} + +TEST_F(ThreadFilterTest, DisabledFilterAcceptsAll) { + ThreadFilter disabled_filter; + disabled_filter.init(nullptr); // Disabled + + EXPECT_FALSE(disabled_filter.enabled()); + EXPECT_TRUE(disabled_filter.accept(-1)); + EXPECT_TRUE(disabled_filter.accept(0)); + EXPECT_TRUE(disabled_filter.accept(999999)); +} + +TEST_F(ThreadFilterTest, InvalidSlotHandling) { + // Test invalid slot IDs for accept() - still safe due to negative check + EXPECT_FALSE(filter->accept(-1)); + EXPECT_FALSE(filter->accept(-999)); + + // These should not crash + filter->add(1234, -1); + filter->remove(-1); + filter->unregisterThread(-1); + filter->unregisterThread(-999); +} + +TEST_F(ThreadFilterTest, ValidSlotIDContract) { + // Verify that all slot IDs returned by registerThread() are valid + std::vector slot_ids; + + for (int i = 0; i < 100; i++) { + int slot_id = filter->registerThread(); + ASSERT_GE(slot_id, 0) << "registerThread() returned invalid slot_id: " << slot_id; + ASSERT_LT(slot_id, ThreadFilter::kMaxThreads) << "slot_id out of range: " << slot_id; + + slot_ids.push_back(slot_id); + + // These operations should always be safe with slot_ids from registerThread() + filter->add(i + 10000, slot_id); + EXPECT_TRUE(filter->accept(slot_id)); + filter->remove(slot_id); + EXPECT_FALSE(filter->accept(slot_id)); + } + + // Verify slot IDs are unique (no duplicates) + std::set unique_slots(slot_ids.begin(), slot_ids.end()); + EXPECT_EQ(unique_slots.size(), slot_ids.size()) << "registerThread() returned duplicate slot IDs"; +} + +// Edge case: Maximum capacity +TEST_F(ThreadFilterTest, MaxCapacityReached) { + std::vector slot_ids; + + // Register up to the maximum + for (int i = 0; i < ThreadFilter::kMaxThreads; i++) { + int slot_id = filter->registerThread(); + if (slot_id >= 0) { + slot_ids.push_back(slot_id); + filter->add(i + 1000, slot_id); // Use unique tids + } + } + + fprintf(stderr, "Successfully registered %zu slots (max=%d)\n", + slot_ids.size(), ThreadFilter::kMaxThreads); + + // Should have registered all slots + EXPECT_EQ(slot_ids.size(), ThreadFilter::kMaxThreads); + + // Next registration should fail + int overflow_slot = filter->registerThread(); + EXPECT_EQ(overflow_slot, -1); + + // Verify all registered slots work + std::vector collected_tids; + filter->collect(collected_tids); + EXPECT_EQ(collected_tids.size(), ThreadFilter::kMaxThreads); + + // Verify all tids are unique + std::set unique_tids(collected_tids.begin(), collected_tids.end()); + EXPECT_EQ(unique_tids.size(), ThreadFilter::kMaxThreads); +} + +// Edge case: Recovery after max capacity +TEST_F(ThreadFilterTest, RecoveryAfterMaxCapacity) { + std::vector slot_ids; + + // Fill to capacity + for (int i = 0; i < ThreadFilter::kMaxThreads; i++) { + int slot_id = filter->registerThread(); + ASSERT_GE(slot_id, 0); + slot_ids.push_back(slot_id); + filter->add(i + 2000, slot_id); + } + + // Should fail to register more + EXPECT_EQ(filter->registerThread(), -1); + + // Unregister half the slots + int slots_to_free = ThreadFilter::kMaxThreads / 2; + for (int i = 0; i < slots_to_free; i++) { + filter->unregisterThread(slot_ids[i]); + slot_ids[i] = -1; // Mark as freed + } + + // Should be able to register new slots again + std::vector new_slot_ids; + for (int i = 0; i < slots_to_free; i++) { + int slot_id = filter->registerThread(); + EXPECT_GE(slot_id, 0) << "Failed to register slot " << i << " after freeing"; + new_slot_ids.push_back(slot_id); + filter->add(i + 3000, slot_id); + } + + // Verify we can still register up to capacity + EXPECT_EQ(new_slot_ids.size(), slots_to_free); + + // Should fail again when at capacity + EXPECT_EQ(filter->registerThread(), -1); + + // Verify collect works correctly + std::vector collected_tids; + filter->collect(collected_tids); + EXPECT_EQ(collected_tids.size(), ThreadFilter::kMaxThreads); +} + +// Free list stress test +TEST_F(ThreadFilterTest, FreeListStressTest) { + const int iterations = 1000; + const int batch_size = 100; + + for (int iter = 0; iter < iterations; iter++) { + std::vector slot_ids; + + // Register a batch + for (int i = 0; i < batch_size; i++) { + int slot_id = filter->registerThread(); + ASSERT_GE(slot_id, 0); + slot_ids.push_back(slot_id); + filter->add(iter * batch_size + i, slot_id); + } + + // Verify all work + for (int slot_id : slot_ids) { + EXPECT_TRUE(filter->accept(slot_id)); + } + + // Unregister all + for (int slot_id : slot_ids) { + filter->unregisterThread(slot_id); + } + + // Verify cleanup + std::vector tids; + filter->collect(tids); + EXPECT_EQ(tids.size(), 0) << "Iteration " << iter << " left " << tids.size() << " tids"; + } +} + +// Multi-threaded edge case testing +TEST_F(ThreadFilterTest, ConcurrentMaxCapacityStress) { + const int num_threads = 8; + const int slots_per_thread = ThreadFilter::kMaxThreads / num_threads; + + std::vector threads; + std::atomic successful_registrations{0}; + std::atomic failed_registrations{0}; + std::vector> thread_slots(num_threads); + + // Each thread tries to register its share of slots + for (int t = 0; t < num_threads; t++) { + threads.emplace_back([&, t]() { + for (int i = 0; i < slots_per_thread + 10; i++) { // Try to over-register + int slot_id = filter->registerThread(); + if (slot_id >= 0) { + thread_slots[t].push_back(slot_id); + filter->add(t * 1000 + i, slot_id); + successful_registrations++; + } else { + failed_registrations++; + } + } + }); + } + + // Wait for all threads + for (auto& t : threads) { + t.join(); + } + + fprintf(stderr, "Successful: %d, Failed: %d, Total attempted: %d\n", + successful_registrations.load(), failed_registrations.load(), + num_threads * (slots_per_thread + 10)); + + // Should have registered exactly kMaxThreads + EXPECT_EQ(successful_registrations.load(), ThreadFilter::kMaxThreads); + EXPECT_GT(failed_registrations.load(), 0); // Some should have failed + + // Verify collect works + std::vector collected_tids; + filter->collect(collected_tids); + EXPECT_EQ(collected_tids.size(), ThreadFilter::kMaxThreads); +} + +// Chunk boundary testing +TEST_F(ThreadFilterTest, ChunkBoundaryBehavior) { + std::vector slot_ids; + + // Register enough slots to span multiple chunks + int slots_to_register = ThreadFilter::kChunkSize * 3 + 10; // 3+ chunks + + for (int i = 0; i < slots_to_register; i++) { + int slot_id = filter->registerThread(); + ASSERT_GE(slot_id, 0) << "Failed at slot " << i; + slot_ids.push_back(slot_id); + filter->add(i + 5000, slot_id); + } + + // Verify all chunks work correctly + for (int i = 0; i < slots_to_register; i++) { + EXPECT_TRUE(filter->accept(slot_ids[i])) << "Slot " << slot_ids[i] << " (index " << i << ") not accepted"; + } + + // Test collect across chunks + std::vector collected_tids; + filter->collect(collected_tids); + EXPECT_EQ(collected_tids.size(), slots_to_register); + + // Verify tids are correct + std::sort(collected_tids.begin(), collected_tids.end()); + for (int i = 0; i < slots_to_register; i++) { + EXPECT_EQ(collected_tids[i], i + 5000) << "TID mismatch at position " << i; + } +} + +// Race condition testing for add/remove/accept +TEST_F(ThreadFilterTest, ConcurrentAddRemoveAccept) { + const int num_threads = 4; + const int operations_per_thread = 10000; + + // Pre-register slots for each thread + std::vector slot_ids(num_threads); + for (int i = 0; i < num_threads; i++) { + slot_ids[i] = filter->registerThread(); + ASSERT_GE(slot_ids[i], 0); + } + + std::vector threads; + std::atomic total_operations{0}; + + for (int t = 0; t < num_threads; t++) { + threads.emplace_back([&, t]() { + int slot_id = slot_ids[t]; + int tid = t + 6000; + + for (int i = 0; i < operations_per_thread; i++) { + // Add + filter->add(tid, slot_id); + + // Should accept + bool accepted = filter->accept(slot_id); + if (!accepted) { + fprintf(stderr, "Thread %d: accept failed after add (op %d)\n", t, i); + } + + // Remove + filter->remove(slot_id); + + // Should not accept + accepted = filter->accept(slot_id); + if (accepted) { + fprintf(stderr, "Thread %d: accept succeeded after remove (op %d)\n", t, i); + } + + total_operations++; + } + }); + } + + // Wait for all threads + for (auto& t : threads) { + t.join(); + } + + EXPECT_EQ(total_operations.load(), num_threads * operations_per_thread); + + // Final state should be empty + std::vector final_tids; + filter->collect(final_tids); + EXPECT_EQ(final_tids.size(), 0); +} + +// Free list exhaustion and recovery +TEST_F(ThreadFilterTest, FreeListExhaustionRecovery) { + // Fill up the free list by registering and unregistering + std::vector slot_ids; + + // Register many slots + for (int i = 0; i < ThreadFilter::kFreeListSize + 100; i++) { + int slot_id = filter->registerThread(); + if (slot_id >= 0) { + slot_ids.push_back(slot_id); + filter->add(i + 7000, slot_id); + } + } + + fprintf(stderr, "Registered %zu slots\n", slot_ids.size()); + + // Unregister all (this should fill the free list) + for (int slot_id : slot_ids) { + filter->unregisterThread(slot_id); + } + + // Try to register new slots - should reuse from free list + std::vector new_slot_ids; + for (int i = 0; i < 100; i++) { + int slot_id = filter->registerThread(); + EXPECT_GE(slot_id, 0) << "Failed to reuse slot " << i; + new_slot_ids.push_back(slot_id); + filter->add(i + 8000, slot_id); + } + + // Verify reused slots work + for (int slot_id : new_slot_ids) { + EXPECT_TRUE(filter->accept(slot_id)); + } + + std::vector final_tids; + filter->collect(final_tids); + EXPECT_EQ(final_tids.size(), new_slot_ids.size()); +} + +// Performance regression test - only run in release builds +#ifdef NDEBUG +TEST_F(ThreadFilterTest, PerformanceRegression) { + const int num_operations = 100000; + + // Pre-register slots + std::vector slot_ids; + for (int i = 0; i < 100; i++) { + int slot_id = filter->registerThread(); + ASSERT_GE(slot_id, 0); + slot_ids.push_back(slot_id); + } + + auto start = std::chrono::high_resolution_clock::now(); + + // Perform many add/accept/remove operations + for (int i = 0; i < num_operations; i++) { + int slot_id = slot_ids[i % slot_ids.size()]; + filter->add(i, slot_id); + bool accepted = filter->accept(slot_id); + EXPECT_TRUE(accepted); + filter->remove(slot_id); + } + + auto end = std::chrono::high_resolution_clock::now(); + auto duration = std::chrono::duration_cast(end - start); + + fprintf(stderr, "Performance: %d operations in %ld microseconds (%.2f ns/op)\n", + num_operations, duration.count(), + (double)duration.count() * 1000.0 / num_operations); + + // Should be fast - less than 200ns per operation is reasonable for this complex test + EXPECT_LT(duration.count() * 1000.0 / num_operations, 200.0); // 200ns per op max +} +#endif // NDEBUG + +// Collect behavior with mixed states +TEST_F(ThreadFilterTest, CollectMixedStates) { + std::vector slot_ids; + std::vector expected_tids; + + // Register slots and add some tids, leave others empty + for (int i = 0; i < 50; i++) { + int slot_id = filter->registerThread(); + ASSERT_GE(slot_id, 0); + slot_ids.push_back(slot_id); + + if (i % 3 == 0) { // Add tid to every 3rd slot + filter->add(i + 9000, slot_id); + expected_tids.push_back(i + 9000); + } + // Leave other slots empty + } + + // Collect should only return slots with tids + std::vector collected_tids; + filter->collect(collected_tids); + + std::sort(expected_tids.begin(), expected_tids.end()); + std::sort(collected_tids.begin(), collected_tids.end()); + + EXPECT_EQ(collected_tids.size(), expected_tids.size()); + for (size_t i = 0; i < expected_tids.size(); i++) { + EXPECT_EQ(collected_tids[i], expected_tids[i]); + } +} \ No newline at end of file diff --git a/ddprof-lib/src/test/cpp/threadIdTable_ut.cpp b/ddprof-lib/src/test/cpp/threadIdTable_ut.cpp new file mode 100644 index 00000000..59f99d44 --- /dev/null +++ b/ddprof-lib/src/test/cpp/threadIdTable_ut.cpp @@ -0,0 +1,263 @@ +/* + * Copyright 2025 Datadog, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include "threadIdTable.h" +#include +#include +#include +#include +#include +#include + +class ThreadIdTableTest : public ::testing::Test { +protected: + void SetUp() override { + table = std::make_unique(); + } + + void TearDown() override { + table.reset(); + } + + std::unique_ptr table; +}; + +// Basic functionality tests +TEST_F(ThreadIdTableTest, BasicInsertAndCollect) { + // Insert some thread IDs + table->insert(1001); + table->insert(1002); + table->insert(1003); + + // Collect and verify + std::unordered_set result; + table->collect(result); + + EXPECT_EQ(result.size(), 3); + EXPECT_TRUE(result.count(1001)); + EXPECT_TRUE(result.count(1002)); + EXPECT_TRUE(result.count(1003)); +} + +TEST_F(ThreadIdTableTest, InvalidThreadIdHandling) { + // Invalid thread ID (0) should be ignored + table->insert(0); + + std::unordered_set result; + table->collect(result); + EXPECT_EQ(result.size(), 0); + + // Negative thread IDs should still work (they're valid Linux thread IDs) + table->insert(-1); + table->collect(result); + EXPECT_EQ(result.size(), 1); + EXPECT_TRUE(result.count(-1)); +} + +TEST_F(ThreadIdTableTest, DuplicateInsertions) { + // Insert same thread ID multiple times + table->insert(2001); + table->insert(2001); + table->insert(2001); + + std::unordered_set result; + table->collect(result); + + // Should only appear once + EXPECT_EQ(result.size(), 1); + EXPECT_TRUE(result.count(2001)); +} + +TEST_F(ThreadIdTableTest, ClearFunctionality) { + // Insert some thread IDs + table->insert(3001); + table->insert(3002); + table->insert(3003); + + // Verify they're there + std::unordered_set result; + table->collect(result); + EXPECT_EQ(result.size(), 3); + + // Clear and verify empty + table->clear(); + result.clear(); + table->collect(result); + EXPECT_EQ(result.size(), 0); +} + +// Hash collision testing +TEST_F(ThreadIdTableTest, HashCollisions) { + // Create thread IDs that will hash to the same slot + // TABLE_SIZE = 256, so tids with same (tid % 256) will collide + std::vector colliding_tids; + int base_tid = 1000; + + // Generate 10 thread IDs that hash to the same slot + for (int i = 0; i < 10; i++) { + int tid = base_tid + (i * 256); // All will hash to same slot + colliding_tids.push_back(tid); + table->insert(tid); + } + + // All should be stored (linear probing should handle collisions) + std::unordered_set result; + table->collect(result); + + EXPECT_EQ(result.size(), colliding_tids.size()); + for (int tid : colliding_tids) { + EXPECT_TRUE(result.count(tid)) << "Missing tid: " << tid; + } +} + +// Capacity testing +TEST_F(ThreadIdTableTest, TableCapacityLimits) { + std::vector inserted_tids; + + // Try to insert more than TABLE_SIZE (256) unique thread IDs + for (int i = 1; i <= 300; i++) { // More than TABLE_SIZE + table->insert(i + 10000); // Use high numbers to avoid conflicts + inserted_tids.push_back(i + 10000); + } + + // Collect and see how many were actually stored + std::unordered_set result; + table->collect(result); + + fprintf(stderr, "Inserted %zu tids, collected %zu tids\n", + inserted_tids.size(), result.size()); + + // Should have stored at most TABLE_SIZE (256) + EXPECT_LE(result.size(), 256); + + // All collected tids should be from our inserted set + for (int tid : result) { + EXPECT_TRUE(std::find(inserted_tids.begin(), inserted_tids.end(), tid) != inserted_tids.end()) + << "Unexpected tid in result: " << tid; + } +} + +// Concurrent access testing (signal safety) +TEST_F(ThreadIdTableTest, ConcurrentInsertions) { + const int num_threads = 8; + const int tids_per_thread = 50; + + std::vector threads; + std::atomic successful_insertions{0}; + std::vector> thread_tids(num_threads); + + // Each thread inserts its own set of thread IDs + for (int t = 0; t < num_threads; t++) { + threads.emplace_back([&, t]() { + for (int i = 0; i < tids_per_thread; i++) { + int tid = t * 1000 + i + 20000; // Unique per thread + thread_tids[t].push_back(tid); + table->insert(tid); + successful_insertions++; + } + }); + } + + // Wait for all threads + for (auto& t : threads) { + t.join(); + } + + EXPECT_EQ(successful_insertions.load(), num_threads * tids_per_thread); + + // Collect and verify + std::unordered_set result; + table->collect(result); + + // Should have all unique thread IDs (or at least most of them) + std::set all_expected_tids; + for (const auto& thread_tids_vec : thread_tids) { + for (int tid : thread_tids_vec) { + all_expected_tids.insert(tid); + } + } + + fprintf(stderr, "Expected %zu unique tids, collected %zu tids\n", + all_expected_tids.size(), result.size()); + + // Table has fixed capacity of 256, so with 400 unique tids, we expect exactly 256 + EXPECT_EQ(result.size(), std::min(all_expected_tids.size(), (size_t)256)); + + // All collected tids should be valid + for (int tid : result) { + EXPECT_TRUE(all_expected_tids.count(tid)) << "Unexpected tid: " << tid; + } +} + +// Edge case: Realistic thread ID patterns +TEST_F(ThreadIdTableTest, RealisticThreadIds) { + // Linux thread IDs are typically large numbers + std::vector realistic_tids = { + 12345, 12346, 12347, // Sequential + 98765, 98766, 98767, // Another sequence + 1234567, 1234568, // Large numbers + 2147483647, // Max int + -1, -2, -3 // Negative (valid in some contexts) + }; + + for (int tid : realistic_tids) { + table->insert(tid); + } + + std::unordered_set result; + table->collect(result); + + // Should have all except tid=0 if any + size_t expected_size = realistic_tids.size(); + EXPECT_EQ(result.size(), expected_size); + + for (int tid : realistic_tids) { + if (tid != 0) { // 0 is invalid and ignored + EXPECT_TRUE(result.count(tid)) << "Missing realistic tid: " << tid; + } + } +} + +// Performance test (release builds only) +#ifdef NDEBUG +TEST_F(ThreadIdTableTest, PerformanceRegression) { + const int num_operations = 100000; + std::vector tids; + + // Pre-generate thread IDs + for (int i = 0; i < 100; i++) { + tids.push_back(i + 30000); + } + + auto start = std::chrono::high_resolution_clock::now(); + + // Perform many insertions + for (int i = 0; i < num_operations; i++) { + table->insert(tids[i % tids.size()]); + } + + auto end = std::chrono::high_resolution_clock::now(); + auto duration = std::chrono::duration_cast(end - start); + + fprintf(stderr, "ThreadIdTable Performance: %d operations in %ld microseconds (%.2f ns/op)\n", + num_operations, duration.count(), + (double)duration.count() * 1000.0 / num_operations); + + // Should be very fast for signal-safe operations + EXPECT_LT(duration.count() * 1000.0 / num_operations, 50.0); // 50ns per op max +} +#endif // NDEBUG \ No newline at end of file diff --git a/ddprof-stresstest/src/jmh/java/com/datadoghq/profiler/stresstest/scenarios/counters/ThreadFilterBenchmark.java b/ddprof-stresstest/src/jmh/java/com/datadoghq/profiler/stresstest/scenarios/counters/ThreadFilterBenchmark.java index eab18e7a..fde05206 100644 --- a/ddprof-stresstest/src/jmh/java/com/datadoghq/profiler/stresstest/scenarios/counters/ThreadFilterBenchmark.java +++ b/ddprof-stresstest/src/jmh/java/com/datadoghq/profiler/stresstest/scenarios/counters/ThreadFilterBenchmark.java @@ -15,7 +15,10 @@ @State(Scope.Benchmark) public class ThreadFilterBenchmark extends Configuration { - private static final int NUM_THREADS = 4; + @Param({"true", "false"}) // Parameterize the filter usage + public boolean useThreadFilters; + + private static final int NUM_THREADS = 15; private ExecutorService executorService; private JavaProfiler profiler; private AtomicBoolean running; @@ -30,13 +33,13 @@ public class ThreadFilterBenchmark extends Configuration { private static final AtomicIntegerArray atomicArray = new AtomicIntegerArray(ARRAY_SIZE); private static final int CACHE_LINE_SIZE = 64; // Typical cache line size private static final int STRIDE = CACHE_LINE_SIZE / Integer.BYTES; // Elements per cache line - private boolean useThreadFilters = true; // Flag to control the use of thread filters private AtomicLong addThreadCount = new AtomicLong(0); private AtomicLong removeThreadCount = new AtomicLong(0); @Setup(Level.Trial) public void setup() throws IOException { System.out.println("Setting up benchmark..."); + System.out.println("Thread filters enabled: " + useThreadFilters); System.out.println("Creating thread pool with " + NUM_THREADS + " threads"); executorService = Executors.newFixedThreadPool(NUM_THREADS); System.out.println("Getting profiler instance"); @@ -53,6 +56,7 @@ public void setup() throws IOException { System.out.println("Starting profiler with " + config); profiler.execute(config); System.out.println("Started profiler with output file"); + running = new AtomicBoolean(true); operationCount = new AtomicLong(0); startTime = System.currentTimeMillis(); @@ -139,7 +143,7 @@ public void setUseThreadFilters(boolean useThreadFilters) { @Benchmark @BenchmarkMode(Mode.Throughput) - @Fork(value = 1, warmups = 0) + @Fork(value = 1, warmups = 1) @Warmup(iterations = 1, time = 1) @Measurement(iterations = 1, time = 2) @Threads(1) @@ -149,14 +153,13 @@ public long threadFilterStress() throws InterruptedException { startLatch = new CountDownLatch(NUM_THREADS); stopLatch = new CountDownLatch(NUM_THREADS); - // Start all worker threads + // Start all worker threads[] for (int i = 0; i < NUM_THREADS; i++) { final int threadId = i; executorService.submit(() -> { try { startLatch.countDown(); startLatch.await(30, TimeUnit.SECONDS); - String startMsg = String.format("Thread %d started%n", threadId); System.out.print(startMsg); if (logWriter != null) { @@ -215,15 +218,6 @@ public long threadFilterStress() throws InterruptedException { } operationCount.incrementAndGet(); } - - if (operationCount.get() % 1000 == 0) { - String progressMsg = String.format("Thread %d completed %d operations%n", threadId, operationCount.get()); - System.out.print(progressMsg); - if (logWriter != null) { - logWriter.print(progressMsg); - logWriter.flush(); - } - } } } catch (InterruptedException e) { Thread.currentThread().interrupt(); @@ -236,4 +230,4 @@ public long threadFilterStress() throws InterruptedException { stopLatch.await(); return operationCount.get(); } -} +} \ No newline at end of file diff --git a/ddprof-stresstest/src/jmh/java/com/datadoghq/profiler/stresstest/scenarios/throughput/ThreadFilterBenchmark.java b/ddprof-stresstest/src/jmh/java/com/datadoghq/profiler/stresstest/scenarios/throughput/ThreadFilterBenchmark.java index bc03e0ca..f56bd258 100644 --- a/ddprof-stresstest/src/jmh/java/com/datadoghq/profiler/stresstest/scenarios/throughput/ThreadFilterBenchmark.java +++ b/ddprof-stresstest/src/jmh/java/com/datadoghq/profiler/stresstest/scenarios/throughput/ThreadFilterBenchmark.java @@ -13,6 +13,7 @@ import org.openjdk.jmh.annotations.Scope; import org.openjdk.jmh.annotations.Setup; import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.TearDown; import org.openjdk.jmh.annotations.Threads; import org.openjdk.jmh.annotations.Warmup; import org.openjdk.jmh.infra.Blackhole; @@ -39,6 +40,17 @@ public class ThreadFilterBenchmark extends Configuration { public void setup() throws IOException { profiler = JavaProfiler.getInstance(); workloadNum = Long.parseLong(workload); + // Start profiling to enable JVMTI ThreadStart callbacks + // Add JFR file parameter to satisfy profiler requirements + profiler.execute("start," + command + ",jfr,file=/tmp/thread-filter-benchmark.jfr"); + } + + @TearDown(Level.Trial) + public void tearDown() throws IOException { + // Stop profiling and clean up + if (profiler != null) { + profiler.execute("stop"); + } } @Benchmark