@@ -399,3 +399,188 @@ size_t HeapRootSegments::segment_offset(size_t seg_idx) {
399399 return _base_offset + seg_idx * _max_size_in_bytes;
400400}
401401
402+ ArchiveWorkers ArchiveWorkers::_workers;
403+
404+ ArchiveWorkers::ArchiveWorkers () :
405+ _start_semaphore(0 ),
406+ _end_semaphore(0 ),
407+ _num_workers(0 ),
408+ _started_workers(0 ),
409+ _waiting_workers(0 ),
410+ _running_workers(0 ),
411+ _state(NOT_READY),
412+ _task(nullptr ) {
413+ }
414+
415+ void ArchiveWorkers::initialize () {
416+ assert (Atomic::load (&_state) == NOT_READY, " Should be" );
417+
418+ Atomic::store (&_num_workers, max_workers ());
419+ Atomic::store (&_state, READY);
420+
421+ // Kick off pool startup by creating a single worker.
422+ start_worker_if_needed ();
423+ }
424+
425+ int ArchiveWorkers::max_workers () {
426+ // The pool is used for short-lived bursty tasks. We do not want to spend
427+ // too much time creating and waking up threads unnecessarily. Plus, we do
428+ // not want to overwhelm large machines. This is why we want to be very
429+ // conservative about the number of workers actually needed.
430+ return MAX2 (0 , log2i_graceful (os::active_processor_count ()));
431+ }
432+
433+ bool ArchiveWorkers::is_parallel () {
434+ return _num_workers > 0 ;
435+ }
436+
437+ void ArchiveWorkers::shutdown () {
438+ while (true ) {
439+ State state = Atomic::load (&_state);
440+ if (state == SHUTDOWN) {
441+ // Already shut down.
442+ return ;
443+ }
444+ if (Atomic::cmpxchg (&_state, state, SHUTDOWN, memory_order_relaxed) == state) {
445+ if (is_parallel ()) {
446+ // Execute a shutdown task and block until all workers respond.
447+ run_task (&_shutdown_task);
448+ }
449+ }
450+ }
451+ }
452+
453+ void ArchiveWorkers::start_worker_if_needed () {
454+ while (true ) {
455+ int cur = Atomic::load (&_started_workers);
456+ if (cur >= _num_workers) {
457+ return ;
458+ }
459+ if (Atomic::cmpxchg (&_started_workers, cur, cur + 1 , memory_order_relaxed) == cur) {
460+ new ArchiveWorkerThread (this );
461+ return ;
462+ }
463+ }
464+ }
465+
466+ void ArchiveWorkers::signal_worker_if_needed () {
467+ while (true ) {
468+ int cur = Atomic::load (&_waiting_workers);
469+ if (cur == 0 ) {
470+ return ;
471+ }
472+ if (Atomic::cmpxchg (&_waiting_workers, cur, cur - 1 , memory_order_relaxed) == cur) {
473+ _start_semaphore.signal (1 );
474+ return ;
475+ }
476+ }
477+ }
478+
479+ void ArchiveWorkers::run_task (ArchiveWorkerTask* task) {
480+ assert ((Atomic::load (&_state) == READY) ||
481+ ((Atomic::load (&_state) == SHUTDOWN) && (task == &_shutdown_task)),
482+ " Should be in correct state" );
483+ assert (Atomic::load (&_task) == nullptr , " Should not have running tasks" );
484+
485+ if (is_parallel ()) {
486+ run_task_multi (task);
487+ } else {
488+ run_task_single (task);
489+ }
490+ }
491+
492+ void ArchiveWorkers::run_task_single (ArchiveWorkerTask* task) {
493+ // Single thread needs no chunking.
494+ task->configure_max_chunks (1 );
495+
496+ // Execute the task ourselves, as there are no workers.
497+ task->work (0 , 1 );
498+ }
499+
500+ void ArchiveWorkers::run_task_multi (ArchiveWorkerTask* task) {
501+ // Multiple threads can work with multiple chunks.
502+ task->configure_max_chunks (_num_workers * CHUNKS_PER_WORKER);
503+
504+ // Set up the run and publish the task.
505+ Atomic::store (&_waiting_workers, _num_workers);
506+ Atomic::store (&_running_workers, _num_workers);
507+ Atomic::release_store (&_task, task);
508+
509+ // Kick off pool wakeup by signaling a single worker, and proceed
510+ // immediately to executing the task locally.
511+ signal_worker_if_needed ();
512+
513+ // Execute the task ourselves, while workers are catching up.
514+ // This allows us to hide parts of task handoff latency.
515+ task->run ();
516+
517+ // Done executing task locally, wait for any remaining workers to complete,
518+ // and then do the final housekeeping.
519+ _end_semaphore.wait ();
520+ Atomic::store (&_task, (ArchiveWorkerTask *) nullptr );
521+ OrderAccess::fence ();
522+
523+ assert (Atomic::load (&_waiting_workers) == 0 , " All workers were signaled" );
524+ assert (Atomic::load (&_running_workers) == 0 , " No workers are running" );
525+ }
526+
527+ void ArchiveWorkerTask::run () {
528+ while (true ) {
529+ int chunk = Atomic::load (&_chunk);
530+ if (chunk >= _max_chunks) {
531+ return ;
532+ }
533+ if (Atomic::cmpxchg (&_chunk, chunk, chunk + 1 , memory_order_relaxed) == chunk) {
534+ assert (0 <= chunk && chunk < _max_chunks, " Sanity" );
535+ work (chunk, _max_chunks);
536+ }
537+ }
538+ }
539+
540+ void ArchiveWorkerTask::configure_max_chunks (int max_chunks) {
541+ if (_max_chunks == 0 ) {
542+ _max_chunks = max_chunks;
543+ }
544+ }
545+
546+ bool ArchiveWorkers::run_as_worker () {
547+ assert (is_parallel (), " Should be in parallel mode" );
548+ _start_semaphore.wait ();
549+
550+ // Avalanche wakeups: each worker signals two others.
551+ signal_worker_if_needed ();
552+ signal_worker_if_needed ();
553+
554+ ArchiveWorkerTask* task = Atomic::load_acquire (&_task);
555+ task->run ();
556+
557+ // All work done in threads should be visible to caller.
558+ OrderAccess::fence ();
559+
560+ // Signal the pool the tasks are complete, if this is the last worker.
561+ if (Atomic::sub (&_running_workers, 1 , memory_order_relaxed) == 0 ) {
562+ _end_semaphore.signal ();
563+ }
564+
565+ // Continue if task was not a termination task.
566+ return (task != &_shutdown_task);
567+ }
568+
569+ ArchiveWorkerThread::ArchiveWorkerThread (ArchiveWorkers* pool) : NamedThread(), _pool(pool) {
570+ set_name (" ArchiveWorkerThread" );
571+ os::create_thread (this , os::os_thread);
572+ os::start_thread (this );
573+ }
574+
575+ void ArchiveWorkerThread::run () {
576+ // Avalanche thread startup: each starting worker starts two others.
577+ _pool->start_worker_if_needed ();
578+ _pool->start_worker_if_needed ();
579+
580+ // Set ourselves up.
581+ os::set_priority (this , NearMaxPriority);
582+
583+ while (_pool->run_as_worker ()) {
584+ // Work until terminated.
585+ }
586+ }
0 commit comments