Skip to content

Commit 9cab6e7

Browse files
committed
[GR-54695] Fix setPendingFlagForVirtualThread() and cleanup TruffleSafepointTest
PullRequest: graal/18049
2 parents b364041 + 3a09da3 commit 9cab6e7

File tree

3 files changed

+103
-31
lines changed

3 files changed

+103
-31
lines changed

compiler/src/jdk.graal.compiler/src/jdk/graal/compiler/debug/ScopeImpl.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,7 +150,16 @@ private boolean isEmptyScope() {
150150
private boolean interceptDisabled;
151151

152152
ScopeImpl(DebugContext owner, Thread thread, boolean interceptDisabled) {
153-
this(owner, thread.getName(), null, false, interceptDisabled);
153+
this(owner, getThreadName(thread), null, false, interceptDisabled);
154+
}
155+
156+
private static String getThreadName(Thread thread) {
157+
String name = thread.getName();
158+
if (name.isEmpty()) { // the default for virtual threads
159+
return thread.toString();
160+
} else {
161+
return name;
162+
}
154163
}
155164

156165
private ScopeImpl(DebugContext owner, String unqualifiedName, ScopeImpl parent, boolean sandbox, boolean interceptDisabled, Object... context) {

truffle/src/com.oracle.truffle.api.test/src/com/oracle/truffle/api/test/TruffleSafepointTest.java

Lines changed: 49 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@
6767
import java.util.concurrent.Executors;
6868
import java.util.concurrent.Future;
6969
import java.util.concurrent.Semaphore;
70+
import java.util.concurrent.ThreadFactory;
7071
import java.util.concurrent.TimeUnit;
7172
import java.util.concurrent.TimeoutException;
7273
import java.util.concurrent.atomic.AtomicBoolean;
@@ -170,7 +171,15 @@ public static void afterClass() throws InterruptedException {
170171
@Before
171172
public void before() throws ExecutionException, InterruptedException {
172173
Assume.assumeFalse(vthreads && !canCreateVirtualThreads());
173-
this.service = vthreads ? Executors.newVirtualThreadPerTaskExecutor() : cachedThreadPool;
174+
if (vthreads) {
175+
ThreadFactory factory = Thread.ofVirtual().uncaughtExceptionHandler((thread, exception) -> {
176+
System.err.println("Uncaught exception in " + thread);
177+
exception.printStackTrace(System.err);
178+
}).factory();
179+
this.service = Executors.newThreadPerTaskExecutor(factory);
180+
} else {
181+
this.service = cachedThreadPool;
182+
}
174183

175184
ProxyLanguage.setDelegate(new ProxyLanguage() {
176185
@Override
@@ -679,9 +688,20 @@ protected void perform(Access access) {
679688

680689
@TruffleBoundary
681690
private static void sleepNanosBoundary(int nanos) {
682-
try {
683-
Thread.sleep(0, nanos);
684-
} catch (InterruptedException ie) {
691+
long deadline = System.nanoTime() + nanos;
692+
long now;
693+
boolean interrupted = false;
694+
while ((now = System.nanoTime()) < deadline) {
695+
try {
696+
Thread.sleep(0, (int) (deadline - now));
697+
break;
698+
} catch (InterruptedException e) {
699+
// cancellation uses Thread#interrupt()
700+
interrupted = true;
701+
}
702+
}
703+
if (interrupted) {
704+
Thread.currentThread().interrupt();
685705
}
686706
}
687707

@@ -1230,6 +1250,11 @@ public void testContextSafepoint() {
12301250
contextSafepoint();
12311251
reschedule();
12321252
}
1253+
/*
1254+
* There is a poll() after this while loop, on the return from the CallTarget. That
1255+
* ensures all ThreadLocalActions are processed without needing to wait for their
1256+
* futures. This also helped to find some bugs.
1257+
*/
12331258
return true;
12341259
})) {
12351260
AtomicInteger eventCounter = new AtomicInteger();
@@ -1697,12 +1722,16 @@ void forEachConfig(TestRunner run) {
16971722
}
16981723

16991724
// asynchronous execution of all configs
1700-
if (RERUN_THREAD_CONFIG_ASYNC) {
1725+
/*
1726+
* this is not done for virtual threads because it would cause to run more virtual threads
1727+
* concurrently than the number of processors (see above).
1728+
*/
1729+
if (RERUN_THREAD_CONFIG_ASYNC && !vthreads) {
17011730
List<Future<?>> futures = new ArrayList<>();
17021731
for (int threads : threadConfigs) {
17031732
for (int events : ITERATION_CONFIGS) {
1704-
if (threads * events <= MAX_THREAD_ITERATIONS_PRODUCT && !(vthreads && threads > processors)) {
1705-
futures.add(service.submit(() -> run.run(threads, events)));
1733+
if (threads * events <= MAX_THREAD_ITERATIONS_PRODUCT) {
1734+
futures.add(cachedThreadPool.submit(() -> run.run(threads, events)));
17061735
}
17071736
}
17081737
}
@@ -1931,13 +1960,20 @@ protected Context createTestContext() {
19311960
@TruffleBoundary
19321961
private static void waitForLatch(CountDownLatch latch) throws AssertionError {
19331962
latch.countDown();
1934-
try {
1935-
latch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS);
1936-
} catch (InterruptedException e) {
1963+
boolean interrupted = false;
1964+
while (true) {
1965+
try {
1966+
if (!latch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
1967+
throw failTimeout("waitForLatch()", null);
1968+
}
1969+
break;
1970+
} catch (InterruptedException e) {
1971+
// cancellation uses Thread#interrupt()
1972+
interrupted = true;
1973+
}
19371974
}
1938-
try {
1939-
latch.await(TIMEOUT_SECONDS, TimeUnit.SECONDS);
1940-
} catch (InterruptedException e) {
1975+
if (interrupted) {
1976+
Thread.currentThread().interrupt();
19411977
}
19421978
}
19431979

truffle/src/com.oracle.truffle.runtime/src/com/oracle/truffle/runtime/hotspot/HotSpotThreadLocalHandshake.java

Lines changed: 44 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -92,11 +92,6 @@ static void doHandshake(Object node) {
9292
SINGLETON.processHandshake((Node) node);
9393
}
9494

95-
@Override
96-
protected void setFastPending(Thread t) {
97-
setVolatile(t, 1);
98-
}
99-
10095
@Override
10196
@TruffleBoundary
10297
public TruffleSafepointImpl getCurrent() {
@@ -110,21 +105,24 @@ public TruffleSafepointImpl getCurrent() {
110105

111106
@Override
112107
protected void clearFastPending() {
113-
setVolatile(Thread.currentThread(), 0);
108+
Thread carrierThread = JAVA_LANG_ACCESS.currentCarrierThread();
109+
long eetop = UNSAFE.getLongVolatile(carrierThread, THREAD_EETOP_OFFSET);
110+
UNSAFE.putIntVolatile(null, eetop + PENDING_OFFSET, 0);
114111
}
115112

116-
private static void setVolatile(Thread thread, int value) {
113+
@Override
114+
protected void setFastPending(Thread thread) {
117115
/*
118116
* The thread will not go away here because the Truffle implementation ensures that this
119117
* method is no longer used if the thread is no longer active. It only sets this state for
120118
* contexts that are currently entered on a thread. Being entered implies that the thread is
121119
* active.
122120
*/
123-
assert thread.isAlive() : "thread must remain alive while setting fast pending";
121+
assert thread.isAlive() : "thread must remain alive while setting the pending flag";
124122

125123
long eetop = UNSAFE.getLongVolatile(thread, THREAD_EETOP_OFFSET);
126124
if (eetop != 0) {
127-
UNSAFE.putIntVolatile(null, eetop + PENDING_OFFSET, value);
125+
UNSAFE.putIntVolatile(null, eetop + PENDING_OFFSET, 1);
128126
} else { // only the case for VirtualThreads
129127
Object carrierThread = UNSAFE.getObjectVolatile(thread, THREAD_CARRIER_THREAD_OFFSET);
130128
if (carrierThread == null) {
@@ -137,7 +135,7 @@ private static void setVolatile(Thread thread, int value) {
137135
return;
138136
}
139137
eetop = UNSAFE.getLongVolatile(carrierThread, THREAD_EETOP_OFFSET);
140-
UNSAFE.putIntVolatile(null, eetop + PENDING_OFFSET, value);
138+
UNSAFE.putIntVolatile(null, eetop + PENDING_OFFSET, 1);
141139
/*
142140
* If the VirtualThread moves to another carrier thread after this method returns, the
143141
* pending flag will still be set correctly thanks to setPendingFlagForVirtualThread()
@@ -150,15 +148,44 @@ private static void setVolatile(Thread thread, int value) {
150148
static void setPendingFlagForVirtualThread() {
151149
TruffleSafepointImpl safepoint = STATE.get();
152150
if (safepoint != null) {
153-
boolean pending = safepoint.isFastPendingSet();
154-
155-
// VirtualThread#carrierThread is not set yet, it set after this hook is called.
156-
// However, Thread.currentCarrierThread() is already set so we can use that.
157-
// We could also get the carrier thread from the hook arguments but that seems more
158-
// expensive.
159151
Thread carrierThread = JAVA_LANG_ACCESS.currentCarrierThread();
160152
long eetop = UNSAFE.getLongVolatile(carrierThread, THREAD_EETOP_OFFSET);
161-
UNSAFE.putIntVolatile(null, eetop + PENDING_OFFSET, pending ? 1 : 0);
153+
154+
/*
155+
* When this method and setFastPending() are run concurrently, there is a possibility
156+
* that we set the carrier pending flag to 0 here based on pendingBefore==false, after
157+
* setFastPending() sets the flag to 1 (which would lose the flag):
158+
*
159+
* @formatter:off
160+
* with VT=VirtualThread MT=the thread that submits the ThreadLocalAction:
161+
* VT: boolean pending = this.fastPendingSet; // false
162+
* MT: fastPendingSet = true;
163+
* MT: carrierThread.pending = true;
164+
* VT: carrierThread.pending = pending; // false
165+
* @formatter:on
166+
*
167+
* So we check for that case with pendingAfter and fix it. This is correct because
168+
* either we wrote the 0 before setFastPending() wrote the 1 (harmless), or we wrote it
169+
* after and then we must see pendingAfter==true, because (the caller of)
170+
* setFastPending() sets fastPendingSet before writing to the carrier pending flag.
171+
*
172+
* If this method and setFastPending() are not run concurrently, then it is as if both
173+
* methods were executed one after another (since all accesses are volatile and so
174+
* sequentially consistent) and it works trivially.
175+
*
176+
* This solution is better than `if (isFastPendingSet()) { carrierThread.pending = 1; }`
177+
* because that can leave the carrier flag as true after an unmount of a virtual thread
178+
* which did not process safepoints yet and the newly-mounted virtual thread would not
179+
* clear it, causing extra stub calls (clearing is only done if fastPendingSet is true,
180+
* necessary for DefaultThreadLocalHandshake where clearFastPending() is not
181+
* idempotent).
182+
*/
183+
boolean pendingBefore = safepoint.isFastPendingSet();
184+
UNSAFE.putIntVolatile(null, eetop + PENDING_OFFSET, pendingBefore ? 1 : 0);
185+
boolean pendingAfter = safepoint.isFastPendingSet();
186+
if (!pendingBefore && pendingAfter) {
187+
UNSAFE.putIntVolatile(null, eetop + PENDING_OFFSET, 1);
188+
}
162189
}
163190
}
164191

0 commit comments

Comments
 (0)