From 596ff7f7e38c68774ab86d2c5c3f09ec6627f2bb Mon Sep 17 00:00:00 2001 From: Martin Entlicher Date: Thu, 24 Jul 2025 18:39:12 +0200 Subject: [PATCH] Use system threads and polyglot threads for JDWP. That allows to join them on context finalize. (GR-67821) --- .../espresso/jdwp/api/JDWPContext.java | 17 ++++ .../jdwp/impl/DebuggerConnection.java | 79 +++++++++++++++---- .../jdwp/impl/DebuggerController.java | 24 +++--- .../espresso/runtime/JDWPContextImpl.java | 10 +++ truffle/CHANGELOG.md | 1 + .../api/test/LanguageSystemThreadTest.java | 42 +++++++++- .../oracle/truffle/api/TruffleLanguage.java | 4 +- .../truffle/polyglot/EngineAccessor.java | 6 +- .../oracle/truffle/polyglot/SystemThread.java | 2 +- 9 files changed, 153 insertions(+), 32 deletions(-) diff --git a/espresso/src/com.oracle.truffle.espresso.jdwp/src/com/oracle/truffle/espresso/jdwp/api/JDWPContext.java b/espresso/src/com.oracle.truffle.espresso.jdwp/src/com/oracle/truffle/espresso/jdwp/api/JDWPContext.java index 02c7871e2c54..885b3e928e03 100644 --- a/espresso/src/com.oracle.truffle.espresso.jdwp/src/com/oracle/truffle/espresso/jdwp/api/JDWPContext.java +++ b/espresso/src/com.oracle.truffle.espresso.jdwp/src/com/oracle/truffle/espresso/jdwp/api/JDWPContext.java @@ -27,6 +27,8 @@ import java.util.Set; import com.oracle.truffle.api.TruffleLanguage; +import com.oracle.truffle.api.TruffleLanguage.Env; +import com.oracle.truffle.api.TruffleThreadBuilder; import com.oracle.truffle.api.frame.Frame; import com.oracle.truffle.api.nodes.Node; import com.oracle.truffle.api.nodes.RootNode; @@ -221,6 +223,21 @@ public interface JDWPContext { */ Ids getIds(); + /** + * Creates a new system thread. + * + * @return {@link Env#createSystemThread(java.lang.Runnable)}. + */ + public Thread createSystemThread(Runnable runnable); + + /** + * Creates a new polyglot thread bound to the Espresso language context. + * + * @return {@link Env#newTruffleThreadBuilder(java.lang.Runnable)}.{@link TruffleThreadBuilder#build() + * build()}. + */ + public Thread createPolyglotThread(Runnable runnable); + /** * @param string guest language string object * @return true if object is a guest language String, false otherwise diff --git a/espresso/src/com.oracle.truffle.espresso.jdwp/src/com/oracle/truffle/espresso/jdwp/impl/DebuggerConnection.java b/espresso/src/com.oracle.truffle.espresso.jdwp/src/com/oracle/truffle/espresso/jdwp/impl/DebuggerConnection.java index aea471b54201..6aab39683089 100644 --- a/espresso/src/com.oracle.truffle.espresso.jdwp/src/com/oracle/truffle/espresso/jdwp/impl/DebuggerConnection.java +++ b/espresso/src/com.oracle.truffle.espresso.jdwp/src/com/oracle/truffle/espresso/jdwp/impl/DebuggerConnection.java @@ -24,9 +24,13 @@ import java.io.IOException; import java.net.Socket; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; +import com.oracle.truffle.api.TruffleSafepoint; import com.oracle.truffle.espresso.jdwp.api.ErrorCodes; import com.oracle.truffle.espresso.jdwp.api.JDWPContext; @@ -43,7 +47,8 @@ private DebuggerConnection(SocketConnection connection, DebuggerController contr } static void establishDebuggerConnection(DebuggerController controller, DebuggerController.SetupState setupState, boolean isReconnect, CountDownLatch startupLatch) { - Thread jdwpReceiver = new Thread(new JDWPReceiver(controller, setupState, isReconnect, startupLatch), "jdwp-receiver"); + Thread jdwpReceiver = controller.getContext().createSystemThread(new JDWPReceiver(controller, setupState, isReconnect, startupLatch)); + jdwpReceiver.setName("jdwp-receiver"); controller.addDebuggerReceiverThread(jdwpReceiver); jdwpReceiver.setDaemon(true); jdwpReceiver.start(); @@ -104,18 +109,14 @@ public void run() { private static class JDWPReceiver implements Runnable { - private static final Object NOT_ENTERED_MARKER = new Object(); private DebuggerController.SetupState setupState; private final DebuggerController controller; - private RequestedJDWPEvents requestedJDWPEvents; - private DebuggerConnection debuggerConnection; private final boolean isReconnect; private final CountDownLatch latch; JDWPReceiver(DebuggerController controller, DebuggerController.SetupState setupState, boolean isReconnect, CountDownLatch latch) { this.setupState = setupState; this.controller = controller; - this.requestedJDWPEvents = new RequestedJDWPEvents(controller); this.isReconnect = isReconnect; this.latch = latch; } @@ -123,6 +124,7 @@ private static class JDWPReceiver implements Runnable { @Override public void run() { // first, complete the connection setup which is potentially blocking + DebuggerConnection debuggerConnection; try { Socket connectionSocket; if (setupState.socket != null) { @@ -161,7 +163,8 @@ public void run() { } // OK, we're ready to fire up the JDWP transmitter thread too - Thread jdwpSender = new Thread(new JDWPSender(socketConnection), "jdwp-transmitter"); + Thread jdwpSender = controller.getContext().createSystemThread(new JDWPSender(socketConnection)); + jdwpSender.setName("jdwp-transmitter"); controller.addDebuggerSenderThread(jdwpSender); jdwpSender.setDaemon(true); jdwpSender.start(); @@ -190,16 +193,18 @@ public void run() { latch.countDown(); } // Now, begin processing packets when they start to flow from the debugger. - // Make sure this thread is entered in the context + final BlockingQueue packetQueue = new LinkedBlockingQueue<>(); + final AtomicBoolean processorClose = new AtomicBoolean(false); + Thread jdwpProcessor = controller.getContext().createPolyglotThread(new JDWPProcessor(controller, debuggerConnection, packetQueue, processorClose)); + jdwpProcessor.setName("jdwp-processor"); + controller.addDebuggerProcessorThread(jdwpProcessor); + jdwpProcessor.setDaemon(true); + jdwpProcessor.start(); try { while (!Thread.currentThread().isInterrupted() && !controller.isClosing()) { - Object previous = NOT_ENTERED_MARKER; try { - // get the packet outside the Truffle context, because it's a blocking IO - // operation Packet packet = Packet.fromByteArray(debuggerConnection.connection.readPacket()); - previous = controller.enterTruffleContext(); - processPacket(packet); + packetQueue.add(packet); } catch (IOException e) { if (!debuggerConnection.isOpen()) { // when the socket is closed, we're done @@ -207,19 +212,56 @@ public void run() { } if (!Thread.currentThread().isInterrupted()) { controller.warning(() -> "Failed to process jdwp packet with message: " + e.getMessage()); + Thread.currentThread().interrupt(); // And set the interrupt flag again } } catch (ConnectionClosedException e) { break; - } finally { - if (previous != NOT_ENTERED_MARKER) { - controller.leaveTruffleContext(previous); - } } } } finally { + processorClose.set(true); + jdwpProcessor.interrupt(); controller.getEventListener().onDetach(); } } + } + + private static class JDWPProcessor implements Runnable { + + private final DebuggerController controller; + private final DebuggerConnection debuggerConnection; + private final RequestedJDWPEvents requestedJDWPEvents; + private final BlockingQueue packetQueue; + private final AtomicBoolean close; + + private JDWPProcessor(DebuggerController controller, DebuggerConnection debuggerConnection, + BlockingQueue packetQueue, AtomicBoolean close) { + this.controller = controller; + this.debuggerConnection = debuggerConnection; + this.requestedJDWPEvents = new RequestedJDWPEvents(controller); + this.packetQueue = packetQueue; + this.close = close; + } + + @Override + public void run() { + while (!close.get()) { + Packet packet; + try { + packet = TruffleSafepoint.getCurrent().setBlockedFunction(null, TruffleSafepoint.Interrupter.THREAD_INTERRUPT, + BlockingQueue::take, packetQueue, () -> breakIfClosed(), null); + } catch (ProcessorClosedException ex) { + break; + } + processPacket(packet); + } + } + + private void breakIfClosed() { + if (close.get()) { + throw new ProcessorClosedException(); + } + } private void processPacket(Packet packet) { JDWPContext context = controller.getContext(); @@ -665,6 +707,11 @@ private void processPacket(Packet packet) { debuggerConnection.handleReply(packet, new CommandResult(reply)); } } + + private static class ProcessorClosedException extends RuntimeException { + + private static final long serialVersionUID = 8467327507834079474L; + } } private static CommandResult unknownCommandSet(Packet packet, DebuggerController controller) { diff --git a/espresso/src/com.oracle.truffle.espresso.jdwp/src/com/oracle/truffle/espresso/jdwp/impl/DebuggerController.java b/espresso/src/com.oracle.truffle.espresso.jdwp/src/com/oracle/truffle/espresso/jdwp/impl/DebuggerController.java index 6b98049428bf..97d5ba2ab77e 100644 --- a/espresso/src/com.oracle.truffle.espresso.jdwp/src/com/oracle/truffle/espresso/jdwp/impl/DebuggerController.java +++ b/espresso/src/com.oracle.truffle.espresso.jdwp/src/com/oracle/truffle/espresso/jdwp/impl/DebuggerController.java @@ -87,6 +87,7 @@ public final class DebuggerController implements ContextsListener { private JDWPContext context; private Thread senderThread; private Thread receiverThread; + private Thread processorThread; private volatile HandshakeController hsController = null; private final Lock resetting = new ReentrantLock(); private volatile boolean isClosing; @@ -169,7 +170,7 @@ public void reInitialize() { DebuggerConnection.establishDebuggerConnection(newController, newController.setupState, true, new CountDownLatch(1)); } - public void reset(boolean prepareForReconnect) { + private void reset(boolean prepareForReconnect) { if (isClosing) { // already done closing, so don't attempt anything further return; @@ -178,7 +179,6 @@ public void reset(boolean prepareForReconnect) { // mark that we're closing down the whole context isClosing = true; } - Thread currentReceiverThread = null; try { // begin section that needs to be synchronized with establishing a new connection and // starting the threads. The logic within the locked part, must be written in a way that @@ -190,8 +190,6 @@ public void reset(boolean prepareForReconnect) { // when resuming all threads endSession(); - currentReceiverThread = receiverThread; - // Close the server socket used to listen for transport dt_socket. // This will unblock the accept call on a server socket. HandshakeController hsc = hsController; @@ -225,9 +223,10 @@ public void reset(boolean prepareForReconnect) { resetting.unlock(); } - // If we're not running in the receiver thread we should join - if (Thread.currentThread() != currentReceiverThread) { - joinThread(currentReceiverThread); + joinThread(receiverThread); + // If we're not running in the processor thread we should join + if (Thread.currentThread() != processorThread) { + joinThread(processorThread); } if (prepareForReconnect && !isClosing && isServer()) { @@ -274,16 +273,23 @@ public void closeSocket() { } public void addDebuggerReceiverThread(Thread thread) { + assert receiverThread == null; receiverThread = thread; } + public void addDebuggerProcessorThread(Thread thread) { + assert processorThread == null; + processorThread = thread; + } + public void addDebuggerSenderThread(Thread thread) { + assert senderThread == null; senderThread = thread; } public boolean isDebuggerThread(Thread hostThread) { - // only the receiver thread enters the context - return hostThread == receiverThread; + // only the procesor thread enters the context + return hostThread == processorThread; } public void markLateStartupError(Throwable t) { diff --git a/espresso/src/com.oracle.truffle.espresso/src/com/oracle/truffle/espresso/runtime/JDWPContextImpl.java b/espresso/src/com.oracle.truffle.espresso/src/com/oracle/truffle/espresso/runtime/JDWPContextImpl.java index 542e5e5909da..e10540a189b0 100644 --- a/espresso/src/com.oracle.truffle.espresso/src/com/oracle/truffle/espresso/runtime/JDWPContextImpl.java +++ b/espresso/src/com.oracle.truffle.espresso/src/com/oracle/truffle/espresso/runtime/JDWPContextImpl.java @@ -124,6 +124,16 @@ public Ids getIds() { return ids; } + @Override + public Thread createSystemThread(Runnable runnable) { + return context.getEnv().createSystemThread(runnable); + } + + @Override + public Thread createPolyglotThread(Runnable runnable) { + return context.getEnv().newTruffleThreadBuilder(runnable).build(); + } + @Override public boolean isString(Object string) { return Meta.isString(string); diff --git a/truffle/CHANGELOG.md b/truffle/CHANGELOG.md index 31543f7038b4..22cda1e28fea 100644 --- a/truffle/CHANGELOG.md +++ b/truffle/CHANGELOG.md @@ -5,6 +5,7 @@ This changelog summarizes major changes between Truffle versions relevant to lan ## Version 26.0 * GR-65048: Introduced `InternalResource.OS.UNSUPPORTED` and `InternalResource.CPUArchitecture.UNSUPPORTED` to represent unsupported platforms. Execution on unsupported platforms must be explicitly enabled using the system property `-Dpolyglot.engine.allowUnsupportedPlatform=true`. If this property is not set, calls to `OS.getCurrent()` or `CPUArchitecture.getCurrent()` will throw an `IllegalStateException` when running on an unsupported platform. `InternalResource` implementations should handle the unsupported platform and describe possible steps in the error message on how to proceed. * GR-66839: Deprecate `Location#isFinal()` as it always returns false. +* GR-67821: `TruffleLanguage.Env#createSystemThread` is now allowed to be be called from a system thread now without an explicitly entered context. ## Version 25.0 * GR-31495 Added ability to specify language and instrument specific options using `Source.Builder.option(String, String)`. Languages may describe available source options by implementing `TruffleLanguage.getSourceOptionDescriptors()` and `TruffleInstrument.getSourceOptionDescriptors()` respectively. diff --git a/truffle/src/com.oracle.truffle.api.test/src/com/oracle/truffle/api/test/LanguageSystemThreadTest.java b/truffle/src/com.oracle.truffle.api.test/src/com/oracle/truffle/api/test/LanguageSystemThreadTest.java index 31e013a75cf3..b5148b3d0da1 100644 --- a/truffle/src/com.oracle.truffle.api.test/src/com/oracle/truffle/api/test/LanguageSystemThreadTest.java +++ b/truffle/src/com.oracle.truffle.api.test/src/com/oracle/truffle/api/test/LanguageSystemThreadTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2022, 2022, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2022, 2025, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * The Universal Permissive License (UPL), Version 1.0 @@ -251,8 +251,16 @@ protected Object execute(RootNode node, Env env, Object[] contextArguments, Obje @Test public void testCreateSystemThreadNotEntered() { try (Context context1 = Context.newBuilder().build()) { - String errorMessage = AbstractExecutableTestLanguage.evalTestLanguage(context1, CreateSystemThreadNotEnteredLanguage.class, "").asString(); - Assert.assertEquals("There is no current context available.", errorMessage); + String successMessage = AbstractExecutableTestLanguage.evalTestLanguage(context1, CreateSystemThreadNotEnteredLanguage.class, "").asString(); + Assert.assertEquals("OK", successMessage); + } + } + + @Test + public void testCreateNewThreadNotEntered() { + try (Context context1 = Context.newBuilder().build()) { + String errorMessage = AbstractExecutableTestLanguage.evalTestLanguage(context1, CreateNewThreadNotEnteredLanguage.class, "").asString(); + Assert.assertEquals("Not entered in an Env's context.", errorMessage); } } @@ -264,6 +272,34 @@ public static final class CreateSystemThreadNotEnteredLanguage extends AbstractE protected Object execute(RootNode node, Env env, Object[] contextArguments, Object[] frameArguments) throws Exception { AtomicReference throwableRef = new AtomicReference<>(); Thread t = env.createSystemThread(() -> { + try { + Thread systemT = env.createSystemThread(() -> { + }); + // Can create system thread from a system thread. + Assert.assertNotNull(systemT); + } catch (Throwable exception) { + throwableRef.set(exception); + } + }); + t.start(); + t.join(); + Throwable throwable = throwableRef.get(); + if (throwable != null) { + return throwable.getMessage(); + } else { + return "OK"; + } + } + } + + @Registration + public static final class CreateNewThreadNotEnteredLanguage extends AbstractExecutableTestLanguage { + + @Override + @TruffleBoundary + protected Object execute(RootNode node, Env env, Object[] contextArguments, Object[] frameArguments) throws Exception { + AtomicReference throwableRef = new AtomicReference<>(); + Thread t = new Thread(() -> { try { env.createSystemThread(() -> { }); diff --git a/truffle/src/com.oracle.truffle.api/src/com/oracle/truffle/api/TruffleLanguage.java b/truffle/src/com.oracle.truffle.api/src/com/oracle/truffle/api/TruffleLanguage.java index ffa99f8ac261..beafe581a90f 100644 --- a/truffle/src/com.oracle.truffle.api/src/com/oracle/truffle/api/TruffleLanguage.java +++ b/truffle/src/com.oracle.truffle.api/src/com/oracle/truffle/api/TruffleLanguage.java @@ -2016,8 +2016,8 @@ public Thread createSystemThread(Runnable runnable) { * {@link TruffleLanguage#initializeThread(Object, Thread) languages} or instruments' * thread-listeners. Creating a system thread does not cause a transition to multi-threaded * access. The {@link Env#isCreateThreadAllowed() creation permit} is not required to create - * a system thread, but the caller must be entered in a context to create a system thread, - * if not an {@link IllegalStateException} is thrown. + * a system thread. The caller must be either entered in a context, or in another system + * thread to create a new system thread. If not an {@link IllegalStateException} is thrown. *

* It is recommended to set an * {@link Thread#setUncaughtExceptionHandler(java.lang.Thread.UncaughtExceptionHandler) diff --git a/truffle/src/com.oracle.truffle.polyglot/src/com/oracle/truffle/polyglot/EngineAccessor.java b/truffle/src/com.oracle.truffle.polyglot/src/com/oracle/truffle/polyglot/EngineAccessor.java index b0877641910d..52a38b314cea 100644 --- a/truffle/src/com.oracle.truffle.polyglot/src/com/oracle/truffle/polyglot/EngineAccessor.java +++ b/truffle/src/com.oracle.truffle.polyglot/src/com/oracle/truffle/polyglot/EngineAccessor.java @@ -2119,8 +2119,12 @@ public Thread createInstrumentSystemThread(Object polyglotInstrument, Runnable r @Override public Thread createLanguageSystemThread(Object polyglotLanguageContext, Runnable runnable, ThreadGroup threadGroup) { PolyglotLanguageContext languageContext = (PolyglotLanguageContext) polyglotLanguageContext; + PolyglotContextImpl currentContext = PolyglotFastThreadLocals.getContext(null); + if (currentContext == null && Thread.currentThread() instanceof LanguageSystemThread systemThread) { + currentContext = systemThread.polyglotContext; + } // Ensure that thread is entered in correct context - if (PolyglotContextImpl.requireContext() != languageContext.context) { + if (currentContext != languageContext.context) { throw new IllegalStateException("Not entered in an Env's context."); } return new LanguageSystemThread(languageContext, runnable, threadGroup); diff --git a/truffle/src/com.oracle.truffle.polyglot/src/com/oracle/truffle/polyglot/SystemThread.java b/truffle/src/com.oracle.truffle.polyglot/src/com/oracle/truffle/polyglot/SystemThread.java index 06edee667008..c9b6c597e2c8 100644 --- a/truffle/src/com.oracle.truffle.polyglot/src/com/oracle/truffle/polyglot/SystemThread.java +++ b/truffle/src/com.oracle.truffle.polyglot/src/com/oracle/truffle/polyglot/SystemThread.java @@ -104,7 +104,7 @@ private void checkClosed() { static final class LanguageSystemThread extends SystemThread { final String languageId; - private final PolyglotContextImpl polyglotContext; + final PolyglotContextImpl polyglotContext; LanguageSystemThread(PolyglotLanguageContext polyglotLanguageContext, Runnable runnable, ThreadGroup threadGroup) { super(runnable, threadGroup, polyglotLanguageContext.context.engine.impl);