Skip to content

Commit 175adc8

Browse files
committed
Abstract virtual threads implementation.
1 parent 7df4915 commit 175adc8

File tree

12 files changed

+424
-78
lines changed

12 files changed

+424
-78
lines changed

substratevm/src/com.oracle.svm.core.jdk11/src/com/oracle/svm/core/jdk11/Target_java_lang_StackWalker.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,11 +64,11 @@
6464
import com.oracle.svm.core.stack.JavaStackFrameVisitor;
6565
import com.oracle.svm.core.stack.JavaStackWalk;
6666
import com.oracle.svm.core.stack.JavaStackWalker;
67-
import com.oracle.svm.core.thread.JavaThreads;
6867
import com.oracle.svm.core.thread.LoomSupport;
6968
import com.oracle.svm.core.thread.Target_java_lang_Continuation;
7069
import com.oracle.svm.core.thread.Target_java_lang_ContinuationScope;
7170
import com.oracle.svm.core.thread.Target_java_lang_VirtualThread;
71+
import com.oracle.svm.core.thread.VirtualThreads;
7272
import com.oracle.svm.core.util.VMError;
7373

7474
@TargetClass(value = java.lang.StackWalker.class, onlyWith = JDK11OrLater.class)
@@ -149,7 +149,7 @@ private <T> T walk(Function<? super Stream<StackFrame>, ? extends T> function) {
149149
JavaStackWalk walk = StackValue.get(JavaStackWalk.class);
150150
Pointer sp = KnownIntrinsics.readCallerStackPointer();
151151

152-
if (LoomSupport.isEnabled() && (this.contScope != null || JavaThreads.isVirtual(thread))) {
152+
if (LoomSupport.isEnabled() && (this.contScope != null || VirtualThreads.get().isVirtual(thread))) {
153153
// has a delimitation scope
154154
Target_java_lang_ContinuationScope delimitationScope = this.contScope != null ? this.contScope : Target_java_lang_VirtualThread.continuationScope();
155155
Target_java_lang_Continuation topContinuation = Target_java_lang_Continuation.getCurrentContinuation(delimitationScope);

substratevm/src/com.oracle.svm.core/src/com/oracle/svm/core/thread/ContinuationLockSupportSubstitutions.java

Lines changed: 15 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -43,8 +43,8 @@ final class Target_java_util_concurrent_locks_LockSupport {
4343
@Substitute
4444
static void unpark(Thread thread) {
4545
if (thread != null) {
46-
if (thread instanceof VirtualThread) {
47-
((VirtualThread) thread).unpark(); // can throw RejectedExecutionException
46+
if (VirtualThreads.get().isVirtual(thread)) {
47+
VirtualThreads.get().unpark(thread);
4848
} else {
4949
U.unpark(thread);
5050
}
@@ -56,8 +56,8 @@ static void park(Object blocker) {
5656
Thread t = Thread.currentThread();
5757
setBlocker(t, blocker);
5858
try {
59-
if (t instanceof VirtualThread) {
60-
((VirtualThread) t).park();
59+
if (VirtualThreads.get().isVirtual(t)) {
60+
VirtualThreads.get().park();
6161
} else {
6262
U.park(false, 0L);
6363
}
@@ -72,8 +72,8 @@ static void parkNanos(Object blocker, long nanos) {
7272
Thread t = Thread.currentThread();
7373
setBlocker(t, blocker);
7474
try {
75-
if (t instanceof VirtualThread) {
76-
((VirtualThread) t).parkNanos(nanos);
75+
if (VirtualThreads.get().isVirtual(t)) {
76+
VirtualThreads.get().parkNanos(nanos);
7777
} else {
7878
U.park(false, nanos);
7979
}
@@ -88,8 +88,8 @@ static void parkUntil(Object blocker, long deadline) {
8888
Thread t = Thread.currentThread();
8989
setBlocker(t, blocker);
9090
try {
91-
if (t instanceof VirtualThread) {
92-
((VirtualThread) t).parkUntil(deadline);
91+
if (VirtualThreads.get().isVirtual(t)) {
92+
VirtualThreads.get().parkUntil(deadline);
9393
} else {
9494
U.park(true, deadline);
9595
}
@@ -100,8 +100,8 @@ static void parkUntil(Object blocker, long deadline) {
100100

101101
@Substitute
102102
static void park() {
103-
if (Thread.currentThread() instanceof VirtualThread) {
104-
((VirtualThread) Thread.currentThread()).park();
103+
if (VirtualThreads.get().isVirtual(Thread.currentThread())) {
104+
VirtualThreads.get().park();
105105
} else {
106106
U.park(false, 0L);
107107
}
@@ -110,8 +110,9 @@ static void park() {
110110
@Substitute
111111
public static void parkNanos(long nanos) {
112112
if (nanos > 0) {
113-
if (Thread.currentThread() instanceof VirtualThread) {
114-
((VirtualThread) Thread.currentThread()).parkNanos(nanos);
113+
if (VirtualThreads.get().isVirtual(Thread.currentThread())) {
114+
VirtualThreads.get().parkNanos(nanos);
115+
((SubstrateVirtualThread) Thread.currentThread()).parkNanos(nanos);
115116
} else {
116117
U.park(false, nanos);
117118
}
@@ -120,8 +121,8 @@ public static void parkNanos(long nanos) {
120121

121122
@Substitute
122123
public static void parkUntil(long deadline) {
123-
if (Thread.currentThread() instanceof VirtualThread) {
124-
((VirtualThread) Thread.currentThread()).parkUntil(deadline);
124+
if (VirtualThreads.get().isVirtual(Thread.currentThread())) {
125+
VirtualThreads.get().parkUntil(deadline);
125126
} else {
126127
U.park(true, deadline);
127128
}

substratevm/src/com.oracle.svm.core/src/com/oracle/svm/core/thread/Continuations.java

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -24,29 +24,11 @@
2424
*/
2525
package com.oracle.svm.core.thread;
2626

27-
import java.util.concurrent.ForkJoinPool;
28-
import java.util.concurrent.ForkJoinWorkerThread;
2927
import java.util.concurrent.ThreadFactory;
3028

31-
/** Continuation implementation <em>independent of</em> Project Loom. */
3229
public final class Continuations {
33-
private static final Thread.UncaughtExceptionHandler UNCAUGHT_EXCEPTION_HANDLER = (t, e) -> {
34-
};
35-
36-
/**
37-
* A pool with the maximum number of threads so we can likely start a platform thread for each
38-
* virtual thread, which we might need when blocking I/O does not yield.
39-
*/
40-
static final ForkJoinPool SCHEDULER = new ForkJoinPool(32767, CarrierThread::new, UNCAUGHT_EXCEPTION_HANDLER, true);
41-
4230
public static ThreadFactory virtualThreadFactory() {
43-
return VirtualThread::new;
44-
}
45-
46-
private static final class CarrierThread extends ForkJoinWorkerThread {
47-
CarrierThread(ForkJoinPool pool) {
48-
super(pool);
49-
}
31+
return VirtualThreads.get().createFactory();
5032
}
5133

5234
private Continuations() {
Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,10 +22,11 @@
2222
* or visit www.oracle.com if you need additional information or have any
2323
* questions.
2424
*/
25-
package com.oracle.svm.hosted.thread;
25+
package com.oracle.svm.core.thread;
2626

2727
import java.util.concurrent.ForkJoinPool;
2828

29+
import org.graalvm.nativeimage.ImageSingletons;
2930
import org.graalvm.nativeimage.Platform;
3031
import org.graalvm.nativeimage.Platforms;
3132
import org.graalvm.nativeimage.hosted.Feature;
@@ -36,8 +37,6 @@
3637
import com.oracle.svm.core.heap.StoredContinuation;
3738
import com.oracle.svm.core.heap.StoredContinuationImpl;
3839
import com.oracle.svm.core.option.SubstrateOptionsParser;
39-
import com.oracle.svm.core.thread.JavaContinuations;
40-
import com.oracle.svm.core.thread.LoomSupport;
4140
import com.oracle.svm.core.util.UserError;
4241
import com.oracle.svm.util.ReflectionUtil;
4342

@@ -46,8 +45,19 @@
4645
public class ContinuationsFeature implements Feature {
4746
@Override
4847
public void afterRegistration(AfterRegistrationAccess access) {
49-
UserError.guarantee(!SubstrateOptions.UseLoom.getValue(), SubstrateOptionsParser.commandArgument(SubstrateOptions.UseLoom, "+") + " cannot be enabled without option " +
50-
SubstrateOptionsParser.commandArgument(SubstrateOptions.SupportContinuations, "+"));
48+
VirtualThreads impl;
49+
if (JavaContinuations.isSupported()) {
50+
if (LoomSupport.isEnabled()) {
51+
impl = new LoomVirtualThreads();
52+
} else {
53+
impl = new SubstrateVirtualThreads();
54+
}
55+
} else {
56+
impl = new NoVirtualThreads();
57+
UserError.guarantee(!SubstrateOptions.UseLoom.getValue(), SubstrateOptionsParser.commandArgument(SubstrateOptions.UseLoom, "+") + " cannot be enabled without option " +
58+
SubstrateOptionsParser.commandArgument(SubstrateOptions.SupportContinuations, "+"));
59+
}
60+
ImageSingletons.add(VirtualThreads.class, impl);
5161
}
5262

5363
@Override
@@ -63,7 +73,8 @@ public void beforeAnalysis(BeforeAnalysisAccess access) {
6373
"Continuation support is used, but not enabled. Use options " +
6474
SubstrateOptionsParser.commandArgument(SubstrateOptions.SupportContinuations, "+") +
6575
" or " + SubstrateOptionsParser.commandArgument(SubstrateOptions.UseLoom, "+") + "."),
66-
StoredContinuationImpl.class);
76+
StoredContinuationImpl.class,
77+
ReflectionUtil.lookupMethod(NoVirtualThreads.class, "unreachable"));
6778
}
6879
}
6980
}

substratevm/src/com.oracle.svm.core/src/com/oracle/svm/core/thread/JavaThreads.java

Lines changed: 27 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@
2626

2727
import static com.oracle.svm.core.SubstrateOptions.MultiThreaded;
2828
import static com.oracle.svm.core.snippets.KnownIntrinsics.readCallerStackPointer;
29-
import static java.util.concurrent.TimeUnit.MILLISECONDS;
3029

3130
import java.lang.Thread.UncaughtExceptionHandler;
3231
import java.util.ArrayList;
@@ -58,6 +57,7 @@
5857
import com.oracle.svm.core.SubstrateDiagnostics;
5958
import com.oracle.svm.core.SubstrateOptions;
6059
import com.oracle.svm.core.SubstrateUtil;
60+
import com.oracle.svm.core.annotate.AlwaysInline;
6161
import com.oracle.svm.core.annotate.NeverInline;
6262
import com.oracle.svm.core.annotate.Uninterruptible;
6363
import com.oracle.svm.core.heap.Heap;
@@ -192,8 +192,8 @@ static boolean isInterrupted(Thread thread) {
192192
}
193193

194194
static boolean getAndClearInterrupt(Thread thread) {
195-
if (JavaContinuations.isSupported() && thread instanceof VirtualThread) {
196-
return ((VirtualThread) thread).getAndClearInterrupt();
195+
if (isVirtual(thread)) {
196+
return VirtualThreads.get().getAndClearInterrupt(thread);
197197
}
198198

199199
/*
@@ -207,7 +207,7 @@ static boolean getAndClearInterrupt(Thread thread) {
207207
}
208208

209209
static boolean platformGetAndUpdateInterrupt(Thread thread, boolean newValue) {
210-
assert !JavaContinuations.isSupported() || !(thread instanceof VirtualThread);
210+
assert !isVirtual(thread);
211211
Target_java_lang_Thread tjlt = toTarget(thread);
212212
boolean oldValue;
213213
if (JavaVersionUtil.JAVA_SPEC <= 11) {
@@ -262,11 +262,18 @@ public static Thread fromVMThread(IsolateThread thread) {
262262
return platformThread.get(thread);
263263
}
264264

265-
public static boolean isVirtual(Thread thread) {
266-
if (!LoomSupport.isEnabled()) {
265+
@AlwaysInline("Inline checks.")
266+
private static boolean isVirtual(Thread thread) {
267+
return VirtualThreads.get().isVirtual(thread);
268+
}
269+
270+
@AlwaysInline("Inline checks.")
271+
private static boolean isVirtualDisallowLoom(Thread thread) {
272+
if (LoomSupport.isEnabled()) {
273+
assert !isVirtual(thread) : "should not see Loom virtual thread objects here";
267274
return false;
268275
}
269-
return toTarget(thread).isVirtual();
276+
return isVirtual(thread);
270277
}
271278

272279
/**
@@ -306,11 +313,8 @@ static void join(Thread thread, long millis) throws InterruptedException {
306313
if (millis < 0) {
307314
throw new IllegalArgumentException("timeout value is negative");
308315
}
309-
if (JavaContinuations.isSupported() && thread instanceof VirtualThread) {
310-
if (thread.isAlive()) {
311-
long nanos = MILLISECONDS.toNanos(millis);
312-
((VirtualThread) thread).joinNanos(nanos);
313-
}
316+
if (isVirtual(thread)) {
317+
VirtualThreads.get().join(thread, millis);
314318
return;
315319
}
316320

@@ -696,8 +700,8 @@ protected void beforeThreadRun(@SuppressWarnings("unused") Thread thread) {
696700
protected abstract void platformYield();
697701

698702
void yield() {
699-
if (JavaContinuations.isSupported() && Thread.currentThread() instanceof VirtualThread) {
700-
((VirtualThread) Thread.currentThread()).tryYield();
703+
if (isVirtualDisallowLoom(Thread.currentThread())) {
704+
VirtualThreads.get().yield();
701705
} else {
702706
platformYield();
703707
}
@@ -837,7 +841,7 @@ static void initializeNewThread(
837841
static void platformPark() {
838842
VMOperationControl.guaranteeOkayToBlock("[JavaThreads.park(): Should not park when it is not okay to block.]");
839843
final Thread thread = Thread.currentThread();
840-
assert !JavaContinuations.isSupported() || !(thread instanceof VirtualThread);
844+
assert !isVirtual(thread);
841845
if (isInterrupted(thread)) { // avoid state changes and synchronization
842846
return;
843847
}
@@ -861,7 +865,7 @@ static void platformPark() {
861865
static void platformPark(long delayNanos) {
862866
VMOperationControl.guaranteeOkayToBlock("[JavaThreads.park(long): Should not park when it is not okay to block.]");
863867
final Thread thread = Thread.currentThread();
864-
assert !JavaContinuations.isSupported() || !(thread instanceof VirtualThread);
868+
assert !isVirtual(thread);
865869
if (isInterrupted(thread)) { // avoid state changes and synchronization
866870
return;
867871
}
@@ -887,7 +891,7 @@ static void platformPark(long delayNanos) {
887891
* @see #platformPark(long)
888892
*/
889893
static void platformUnpark(Thread thread) {
890-
assert !JavaContinuations.isSupported() || !(thread instanceof VirtualThread);
894+
assert !isVirtual(thread);
891895
ensureUnsafeParkEvent(thread).unpark();
892896
}
893897

@@ -897,9 +901,8 @@ private static ParkEvent ensureUnsafeParkEvent(Thread thread) {
897901
}
898902

899903
static void sleep(long millis) throws InterruptedException {
900-
if (JavaContinuations.isSupported() && Thread.currentThread() instanceof VirtualThread) {
901-
long nanos = TimeUnit.NANOSECONDS.convert(millis, TimeUnit.MILLISECONDS);
902-
((VirtualThread) Thread.currentThread()).sleepNanos(nanos);
904+
if (isVirtualDisallowLoom(Thread.currentThread())) {
905+
VirtualThreads.get().sleepMillis(millis);
903906
} else {
904907
platformSleep(millis);
905908
}
@@ -947,23 +950,22 @@ private static void platformSleep0(long delayNanos) {
947950
* @see #platformSleep(long)
948951
*/
949952
static void platformInterrupt(Thread thread) {
950-
assert !JavaContinuations.isSupported() || !(thread instanceof VirtualThread);
953+
assert !isVirtual(Thread.currentThread());
951954
final ParkEvent sleepEvent = JavaThreads.getSleepParkEvent(thread).get();
952955
if (sleepEvent != null) {
953956
sleepEvent.unpark();
954957
}
955958
}
956959

957960
static boolean isAlive(Thread thread) {
958-
if (JavaContinuations.isSupported() && thread instanceof VirtualThread) {
959-
Thread.State state = thread.getState();
960-
return !(state == Thread.State.NEW || state == Thread.State.TERMINATED);
961+
if (isVirtualDisallowLoom(thread)) {
962+
return VirtualThreads.get().isAlive(thread);
961963
}
962964
return platformIsAlive(thread);
963965
}
964966

965967
static boolean platformIsAlive(Thread thread) {
966-
assert !JavaContinuations.isSupported() || !(thread instanceof VirtualThread);
968+
assert !isVirtual(Thread.currentThread());
967969
int threadStatus = LoomSupport.CompatibilityUtil.getThreadStatus(toTarget(thread));
968970
return !(threadStatus == ThreadStatus.NEW || threadStatus == ThreadStatus.TERMINATED);
969971
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright (c) 2021, 2021, Oracle and/or its affiliates. All rights reserved.
3+
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
4+
*
5+
* This code is free software; you can redistribute it and/or modify it
6+
* under the terms of the GNU General Public License version 2 only, as
7+
* published by the Free Software Foundation. Oracle designates this
8+
* particular file as subject to the "Classpath" exception as provided
9+
* by Oracle in the LICENSE file that accompanied this code.
10+
*
11+
* This code is distributed in the hope that it will be useful, but WITHOUT
12+
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
13+
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
14+
* version 2 for more details (a copy is included in the LICENSE file that
15+
* accompanied this code).
16+
*
17+
* You should have received a copy of the GNU General Public License version
18+
* 2 along with this work; if not, write to the Free Software Foundation,
19+
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
20+
*
21+
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
22+
* or visit www.oracle.com if you need additional information or have any
23+
* questions.
24+
*/
25+
package com.oracle.svm.core.thread;
26+
27+
import static java.util.concurrent.TimeUnit.MILLISECONDS;
28+
29+
import java.util.concurrent.ThreadFactory;
30+
31+
import com.oracle.svm.core.SubstrateUtil;
32+
import com.oracle.svm.core.annotate.AlwaysInline;
33+
import com.oracle.svm.core.util.VMError;
34+
35+
/**
36+
* Code specific to virtual threads is part of the {@link Thread} methods, e.g. {@code yield} or
37+
* {@code sleep}, and the implementation for platform threads in {@code yield0} or {@code sleep0},
38+
* so we only substitute this platform thread code in {@link Target_java_lang_Thread}, and never
39+
* expect these methods to be reachable, therefore extending {@link NoVirtualThreads}.
40+
*/
41+
final class LoomVirtualThreads extends NoVirtualThreads {
42+
private static Target_java_lang_VirtualThread cast(Thread thread) {
43+
return SubstrateUtil.cast(thread, Target_java_lang_VirtualThread.class);
44+
}
45+
46+
@Override
47+
public ThreadFactory createFactory() {
48+
throw VMError.unimplemented();
49+
}
50+
51+
@AlwaysInline("Eliminate code handling virtual threads.")
52+
@Override
53+
public boolean isVirtual(Thread thread) {
54+
return Target_java_lang_VirtualThread.class.isInstance(thread);
55+
}
56+
57+
@Override
58+
public void join(Thread thread, long millis) throws InterruptedException {
59+
if (thread.isAlive()) {
60+
long nanos = MILLISECONDS.toNanos(millis);
61+
cast(thread).joinNanos(nanos);
62+
}
63+
}
64+
}

0 commit comments

Comments
 (0)