Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
cb1aedf
In-progress snapshot
DougLea Jan 6, 2025
5c4ca21
Better conform to STPE
DougLea Jan 7, 2025
bb0e6a6
Refactorings
DougLea Jan 9, 2025
f683d7f
Use pendingRemval queue
DougLea Jan 9, 2025
3801ba0
Reduce unparks
DougLea Jan 10, 2025
d630b40
Better pending queues
DougLea Jan 16, 2025
0dd4ab7
Merge branch 'openjdk:master' into JDK-8319447
DougLea Jan 16, 2025
7e04af5
Conform to default STPE policies
DougLea Jan 19, 2025
7288e32
Comment out racy test
DougLea Jan 19, 2025
b14e31c
Merge branch 'openjdk:master' into JDK-8319447
DougLea Jan 19, 2025
cb202b6
Reduce memory contention
DougLea Jan 21, 2025
1f0a5cf
Use nanoTimeOrigin
DougLea Jan 21, 2025
97a2920
Reduce nanoTime usage; extend tck tests
DougLea Jan 23, 2025
f9aa135
Simplify scheduler state tracking
DougLea Jan 23, 2025
798fe64
Refactor delay scheduler pool submissions
DougLea Jan 24, 2025
d083e91
improve removal cost balance
DougLea Jan 25, 2025
1a7f77c
Ensure negative nanotime offset
DougLea Jan 26, 2025
f832335
Merge branch 'openjdk:master' into JDK-8319447
DougLea Feb 2, 2025
3da4fd7
Solidify design; add documentation
DougLea Feb 2, 2025
49b1699
Separate out DelayScheduler.java
DougLea Feb 3, 2025
229da14
Rework FJP-DS connections
DougLea Feb 4, 2025
9fad8d4
Deal with commonPool parallelism zero; use in other juc classes; remo…
DougLea Feb 7, 2025
2e60dc9
Refactor schedule methods
DougLea Feb 8, 2025
a0db427
Isolate screening
DougLea Feb 8, 2025
d0f4af1
Support STPE policy methods
DougLea Feb 9, 2025
a6290ab
Reduce memory accesses
DougLea Feb 9, 2025
c839299
Simplify policy methods; improve layout
DougLea Feb 10, 2025
f1394c4
Rename DelayedTask to ScheduledForkJoinTask; misc other improvements
DougLea Feb 12, 2025
0e13955
Better accommodate CompletableFuture; use 4-ary heap; add javadocs; o…
DougLea Feb 15, 2025
14a7a6f
Reduce garbage retention; use trailing padding; add tests
DougLea Feb 16, 2025
bd58f41
Merge branch 'openjdk:master' into JDK-8319447
DougLea Feb 16, 2025
93aac79
Merge remote-tracking branch 'refs/remotes/origin/JDK-8319447' into J…
DougLea Feb 16, 2025
753d0e0
Add optional SubmitWithTimeout action
DougLea Feb 17, 2025
53516e9
Misc minor improvements and renamings for clarity
DougLea Feb 19, 2025
16815cc
Address feedback
DougLea Feb 21, 2025
84eaab0
Address review comments; ensure new internal methods can't clash with…
DougLea Feb 22, 2025
c9bc41a
Standardize parameter checking
DougLea Feb 23, 2025
b40513f
Address review comments; reactivation tweak
DougLea Feb 28, 2025
0c5d22a
Reduce volatile reads
DougLea Mar 1, 2025
5c0355b
Associate probes with carriers if Virtual (no doc updates yet)
DougLea Mar 8, 2025
f670910
Merge branch 'openjdk:master' into JDK-8319447
DougLea Mar 8, 2025
6fe1a3b
Disambiguate caller-runs vs Interruptible
DougLea Mar 9, 2025
9cc670b
Use SharedSecrets for ThreadLocalRandomProbe; other tweaks
DougLea Mar 11, 2025
a553179
Initial commit
AlanBateman Mar 13, 2025
172a235
Reword javadoc
DougLea Mar 13, 2025
9b51b7a
Use TC_MASK in accord with https://bugs.openjdk.org/browse/JDK-833001…
DougLea Mar 14, 2025
cfdd358
Merge branch 'pull/23702' into JDK-8351927
AlanBateman Mar 21, 2025
24422e4
Match indent of naster changes
DougLea Mar 22, 2025
b552c22
Merge branch 'openjdk:master' into JDK-8319447
DougLea Mar 22, 2025
b9ce7c5
Merge branch 'pull/23702' into JDK-8351927
AlanBateman Mar 24, 2025
67d77ba
Change micros to throughput in seconds
AlanBateman Mar 24, 2025
9cf0a75
Address review comments
DougLea Mar 25, 2025
4aabe6b
Merge branch 'openjdk:master' into JDK-8319447
DougLea Mar 25, 2025
a3f528d
Update
AlanBateman Mar 27, 2025
3237cc7
Address review comments
DougLea Mar 27, 2025
b872713
Typo
DougLea Mar 27, 2025
f29b9f2
Merge branch 'pull/23702' into JDK-8351927
AlanBateman Apr 2, 2025
50ac134
Merge
AlanBateman Apr 2, 2025
700f26b
Merge branch 'master' into JDK-8351927
AlanBateman Apr 3, 2025
52a9d5c
Merge branch 'master' into JDK-8351927
AlanBateman Apr 7, 2025
30eba77
Merge branch 'master' into JDK-8351927
AlanBateman Apr 7, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions src/java.base/share/classes/java/lang/System.java
Original file line number Diff line number Diff line change
Expand Up @@ -2290,10 +2290,6 @@ public Executor virtualThreadDefaultScheduler() {
return VirtualThread.defaultScheduler();
}

public Stream<ScheduledExecutorService> virtualThreadDelayedTaskSchedulers() {
return VirtualThread.delayedTaskSchedulers();
}

public StackWalker newStackWalkerInstance(Set<StackWalker.Option> options,
ContinuationScope contScope,
Continuation continuation) {
Expand Down
104 changes: 61 additions & 43 deletions src/java.base/share/classes/java/lang/VirtualThread.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2018, 2024, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2018, 2025, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
Expand All @@ -24,7 +24,6 @@
*/
package java.lang;

import java.util.Arrays;
import java.util.Locale;
import java.util.Objects;
import java.util.concurrent.CountDownLatch;
Expand All @@ -38,7 +37,6 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import jdk.internal.event.VirtualThreadEndEvent;
import jdk.internal.event.VirtualThreadStartEvent;
import jdk.internal.event.VirtualThreadSubmitFailedEvent;
Expand Down Expand Up @@ -66,7 +64,6 @@ final class VirtualThread extends BaseVirtualThread {
private static final Unsafe U = Unsafe.getUnsafe();
private static final ContinuationScope VTHREAD_SCOPE = new ContinuationScope("VirtualThreads");
private static final ForkJoinPool DEFAULT_SCHEDULER = createDefaultScheduler();
private static final ScheduledExecutorService[] DELAYED_TASK_SCHEDULERS = createDelayedTaskSchedulers();

private static final long STATE = U.objectFieldOffset(VirtualThread.class, "state");
private static final long PARK_PERMIT = U.objectFieldOffset(VirtualThread.class, "parkPermit");
Expand Down Expand Up @@ -193,13 +190,6 @@ static Executor defaultScheduler() {
return DEFAULT_SCHEDULER;
}

/**
* Returns a stream of the delayed task schedulers used to support timed operations.
*/
static Stream<ScheduledExecutorService> delayedTaskSchedulers() {
return Arrays.stream(DELAYED_TASK_SCHEDULERS);
}

/**
* Returns the continuation scope used for virtual threads.
*/
Expand Down Expand Up @@ -567,8 +557,9 @@ private void afterYield() {
setState(newState = PARKED);
} else {
// schedule unpark
long timeout = this.timeout;
assert timeout > 0;
timeoutTask = schedule(this::unpark, timeout, NANOSECONDS);
timeoutTask = schedule(this::parkTimeoutExpired, timeout, NANOSECONDS);
setState(newState = TIMED_PARKED);
}

Expand Down Expand Up @@ -618,6 +609,7 @@ private void afterYield() {
// the timeout task to coordinate access to the sequence number and to
// ensure the timeout task doesn't execute until the thread has got to
// the TIMED_WAIT state.
long timeout = this.timeout;
assert timeout > 0;
synchronized (timedWaitLock()) {
byte seqNo = ++timedWaitSeqNo;
Expand Down Expand Up @@ -890,7 +882,19 @@ private void unblock() {
}

/**
* Invoked by timer thread when wait timeout for virtual thread has expired.
* Invoked by FJP worker thread or STPE thread when park timeout expires.
*/
private void parkTimeoutExpired() {
assert !VirtualThread.currentThread().isVirtual();
if (!getAndSetParkPermit(true)
&& (state() == TIMED_PARKED)
&& compareAndSetState(TIMED_PARKED, UNPARKED)) {
lazySubmitRunContinuation();
}
}

/**
* Invoked by FJP worker thread or STPE thread when wait timeout expires.
* If the virtual thread is in timed-wait then this method will unblock the thread
* and submit its task so that it continues and attempts to reenter the monitor.
* This method does nothing if the thread has been woken by notify or interrupt.
Expand All @@ -913,7 +917,7 @@ private void waitTimeoutExpired(byte seqNo) {
}
}
if (unblocked) {
submitRunContinuation();
lazySubmitRunContinuation();
return;
}
// need to retry when thread is suspended in time-wait
Expand Down Expand Up @@ -1444,40 +1448,54 @@ private static ForkJoinPool createDefaultScheduler() {
/**
* Schedule a runnable task to run after a delay.
*/
private static Future<?> schedule(Runnable command, long delay, TimeUnit unit) {
long tid = Thread.currentThread().threadId();
int index = (int) tid & (DELAYED_TASK_SCHEDULERS.length - 1);
return DELAYED_TASK_SCHEDULERS[index].schedule(command, delay, unit);
private Future<?> schedule(Runnable command, long delay, TimeUnit unit) {
if (scheduler instanceof ForkJoinPool pool) {
return pool.schedule(command, delay, unit);
} else {
return DelayedTaskSchedulers.schedule(command, delay, unit);
}
}

/**
* Creates the ScheduledThreadPoolExecutors used to execute delayed tasks.
* Supports scheduling a runnable task to run after a delay. It uses a number
* of ScheduledThreadPoolExecutor instances to reduce contention on the delayed
* work queue used. This class is used when using a custom scheduler.
*/
private static ScheduledExecutorService[] createDelayedTaskSchedulers() {
String propName = "jdk.virtualThreadScheduler.timerQueues";
String propValue = System.getProperty(propName);
int queueCount;
if (propValue != null) {
queueCount = Integer.parseInt(propValue);
if (queueCount != Integer.highestOneBit(queueCount)) {
throw new RuntimeException("Value of " + propName + " must be power of 2");
}
} else {
int ncpus = Runtime.getRuntime().availableProcessors();
queueCount = Math.max(Integer.highestOneBit(ncpus / 4), 1);
private static class DelayedTaskSchedulers {
private static final ScheduledExecutorService[] INSTANCE = createDelayedTaskSchedulers();

static Future<?> schedule(Runnable command, long delay, TimeUnit unit) {
long tid = Thread.currentThread().threadId();
int index = (int) tid & (INSTANCE.length - 1);
return INSTANCE[index].schedule(command, delay, unit);
}
var schedulers = new ScheduledExecutorService[queueCount];
for (int i = 0; i < queueCount; i++) {
ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor)
Executors.newScheduledThreadPool(1, task -> {
Thread t = InnocuousThread.newThread("VirtualThread-unparker", task);
t.setDaemon(true);
return t;
});
stpe.setRemoveOnCancelPolicy(true);
schedulers[i] = stpe;

private static ScheduledExecutorService[] createDelayedTaskSchedulers() {
String propName = "jdk.virtualThreadScheduler.timerQueues";
String propValue = System.getProperty(propName);
int queueCount;
if (propValue != null) {
queueCount = Integer.parseInt(propValue);
if (queueCount != Integer.highestOneBit(queueCount)) {
throw new RuntimeException("Value of " + propName + " must be power of 2");
}
} else {
int ncpus = Runtime.getRuntime().availableProcessors();
queueCount = Math.max(Integer.highestOneBit(ncpus / 4), 1);
}
var schedulers = new ScheduledExecutorService[queueCount];
for (int i = 0; i < queueCount; i++) {
ScheduledThreadPoolExecutor stpe = (ScheduledThreadPoolExecutor)
Executors.newScheduledThreadPool(1, task -> {
Thread t = InnocuousThread.newThread("VirtualThread-unparker", task);
t.setDaemon(true);
return t;
});
stpe.setRemoveOnCancelPolicy(true);
schedulers[i] = stpe;
}
return schedulers;
}
return schedulers;
}

/**
Expand Down Expand Up @@ -1514,4 +1532,4 @@ private static void unblockVirtualThreads() {
unblocker.setDaemon(true);
unblocker.start();
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2003, 2024, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2003, 2025, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
Expand Down Expand Up @@ -45,7 +45,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Stream;

import jdk.internal.loader.NativeLibraries;
Expand Down Expand Up @@ -586,11 +585,6 @@ public interface JavaLangAccess {
*/
Executor virtualThreadDefaultScheduler();

/**
* Returns a stream of the delayed task schedulers used for virtual threads.
*/
Stream<ScheduledExecutorService> virtualThreadDelayedTaskSchedulers();

/**
* Creates a new StackWalker
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, 2024, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2023, 2025, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
Expand Down Expand Up @@ -50,19 +50,6 @@ private static byte[] printScheduler() {
sb.append(JLA.virtualThreadDefaultScheduler())
.append(System.lineSeparator());

// break
sb.append(System.lineSeparator());

// delayed task schedulers
sb.append("Delayed task schedulers:").append(System.lineSeparator());
var delayedTaskSchedulers = JLA.virtualThreadDelayedTaskSchedulers().toList();
IntStream.range(0, delayedTaskSchedulers.size())
.forEach(i -> sb.append('[')
.append(i)
.append("] ")
.append(delayedTaskSchedulers.get(i))
.append(System.lineSeparator()));

return sb.toString().getBytes(StandardCharsets.UTF_8);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, 2024, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2023, 2025, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
Expand Down Expand Up @@ -59,13 +59,11 @@ class VThreadCommandsTest {
*/
@Test
void testVThreadScheduler() {
// ensure default scheduler and timeout schedulers are initialized
// ensure default scheduler is initialized
Thread.startVirtualThread(() -> { });

jcmd("Thread.vthread_scheduler")
.shouldContain(Objects.toIdentityString(defaultScheduler()))
.shouldContain("Delayed task schedulers:")
.shouldContain("[0] " + ScheduledThreadPoolExecutor.class.getName());
.shouldContain(Objects.toIdentityString(defaultScheduler()));
}

/**
Expand Down
98 changes: 98 additions & 0 deletions test/micro/org/openjdk/bench/java/lang/VirtualThreadParking.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
/*
* Copyright (c) 2024, 2025, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package org.openjdk.bench.java.lang;

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import org.openjdk.jmh.annotations.*;

@BenchmarkMode(Mode.Throughput)
@State(Scope.Benchmark)
@Warmup(iterations = 5, time = 10, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 5, time = 10, timeUnit = TimeUnit.SECONDS)
@Fork(value = 3)
@OutputTimeUnit(TimeUnit.SECONDS)
public class VirtualThreadParking {

@Param({"100", "1000", "10000"})
int threadCount;

/**
* Starts N threads that time-park, main thread unparks.
*/
@Benchmark
public void timedParkAndUnpark1() throws Exception {
var threads = new Thread[threadCount];
var unparked = new boolean[threadCount];
for (int i = 0; i < threadCount; i++) {
threads[i] = Thread.ofVirtual().start(() -> {
LockSupport.parkNanos(Long.MAX_VALUE);
});
}
int remaining = threadCount;
while (remaining > 0) {
for (int i = 0; i < threadCount; i++) {
if (!unparked[i]) {
Thread t = threads[i];
if (t.getState() == Thread.State.TIMED_WAITING) {
LockSupport.unpark(t);
unparked[i] = true;
remaining--;
}
}
}
if (remaining > 0) {
Thread.yield();
}
}
for (Thread t : threads) {
t.join();
}
}

/**
* Starts N threads that time-park, start another N threads to unpark.
*/
@Benchmark
public void timedParkAndUnpark2() throws Exception {
var threads = new Thread[threadCount * 2];
for (int i = 0; i < threadCount; i++) {
threads[i] = Thread.ofVirtual().start(() -> {
LockSupport.parkNanos(Long.MAX_VALUE);
});
}
for (int i = 0; i < threadCount; i++) {
Thread thread1 = threads[i];
Thread thread2 = Thread.ofVirtual().start(() -> {
while (thread1.getState() != Thread.State.TIMED_WAITING) {
Thread.yield();
}
LockSupport.unpark(thread1);
});
threads[threadCount + i] = thread2;
}
for (Thread t : threads) {
t.join();
}
}
}
Loading