Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,6 @@

package jdk.internal.foreign;

import java.lang.invoke.MethodHandles;
import java.lang.invoke.VarHandle;
import java.lang.ref.Cleaner;

import jdk.internal.vm.annotation.ForceInline;

/**
* A confined session, which features an owner thread. The liveness check features an additional
* confinement check - that is, calling any operation on this session from a thread other than the
Expand All @@ -39,55 +33,25 @@
*/
final class ConfinedSession extends MemorySessionImpl {

private int asyncReleaseCount = 0;

static final VarHandle ASYNC_RELEASE_COUNT;

static {
try {
ASYNC_RELEASE_COUNT = MethodHandles.lookup().findVarHandle(ConfinedSession.class, "asyncReleaseCount", int.class);
} catch (Throwable ex) {
throw new ExceptionInInitializerError(ex);
}
}

public ConfinedSession(Thread owner) {
super(owner, new ConfinedResourceList());
}

@Override
@ForceInline
public void acquire0() {
checkValidState();
if (state == MAX_FORKS) {
throw tooManyAcquires();
}
state++;
assertIsAccessibleByCurrentThread();
super.acquire0();
}

@Override
@ForceInline
public void release0() {
if (Thread.currentThread() == owner) {
state--;
} else {
// It is possible to end up here in two cases: this session was kept alive by some other confined session
// which is implicitly released (in which case the release call comes from the cleaner thread). Or,
// this session might be kept alive by a shared session, which means the release call can come from any
// thread.
ASYNC_RELEASE_COUNT.getAndAdd(this, 1);
}
}
// It is possible to that release0 is called by another thread: this session was kept alive by some other confined session
// which is implicitly released (in which case the release call comes from the cleaner thread). Or,
// this session might be kept alive by a shared session, which means the release call can come from any
// thread. Hence, release0() is not checked for thread usage

@Override
void justClose() {
checkValidState();
int asyncCount = (int)ASYNC_RELEASE_COUNT.getVolatile(this);
if ((state == 0 && asyncCount == 0)
|| ((state - asyncCount) == 0)) {
state = CLOSED;
} else {
throw alreadyAcquired(state - asyncCount);
}
assertIsAccessibleByCurrentThread();
super.justClose();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
import jdk.internal.misc.ScopedMemoryAccess;
import jdk.internal.vm.annotation.ForceInline;

import static jdk.internal.foreign.MappedMemorySegmentImpl.SCOPED_MEMORY_ACCESS;

/**
* This class manages the temporal bounds associated with a memory segment as well
* as thread confinement. A session has a liveness bit, which is updated when the session is closed
Expand Down Expand Up @@ -76,7 +78,7 @@ public abstract sealed class MemorySessionImpl
}
}

public void addCloseAction(Runnable runnable) {
public final void addCloseAction(Runnable runnable) {
Objects.requireNonNull(runnable);
addInternal(ResourceList.ResourceCleanup.ofRunnable(runnable));
}
Expand All @@ -91,7 +93,7 @@ public void addCloseAction(Runnable runnable) {
* new segment to the client). For this reason, it's not worth adding extra complexity to the segment
* initialization logic here - and using an optimistic logic works well in practice.
*/
public void addOrCleanupIfFail(ResourceList.ResourceCleanup resource) {
public final void addOrCleanupIfFail(ResourceList.ResourceCleanup resource) {
try {
addInternal(resource);
} catch (Throwable ex) {
Expand Down Expand Up @@ -129,17 +131,40 @@ public static MemorySessionImpl createImplicit(Cleaner cleaner) {
}

@Override
public MemorySegment allocate(long byteSize, long byteAlignment) {
public final MemorySegment allocate(long byteSize, long byteAlignment) {
Utils.checkAllocationSizeAndAlign(byteSize, byteAlignment);
return NativeMemorySegmentImpl.makeNativeSegment(byteSize, byteAlignment, this);
}

public abstract void release0();
@ForceInline
public void release0() {
int value;
do {
value = (int)STATE.getVolatile(this);
if (value <= OPEN) {
//cannot get here - we can't close segment twice
throw alreadyClosed();
}
} while (!STATE.compareAndSet(this, value, value - 1));
}

public abstract void acquire0();
@ForceInline
public void acquire0() {
int value;
do {
value = (int)STATE.getVolatile(this);
if (value < OPEN) {
//segment is not open!
throw alreadyClosed();
} else if (value == MAX_FORKS) {
//overflow
throw tooManyAcquires();
}
} while (!STATE.compareAndSet(this, value, value + 1));
}

@Override
public void whileAlive(Runnable action) {
public final void whileAlive(Runnable action) {
Objects.requireNonNull(action);
acquire0();
try {
Expand Down Expand Up @@ -168,8 +193,9 @@ public final boolean isAccessibleBy(Thread thread) {
* Returns true, if this session is still open. This method may be called in any thread.
* @return {@code true} if this session is not closed yet.
*/
public boolean isAlive() {
return state >= OPEN;
@Override
public final boolean isAlive() {
return state() >= OPEN;
}

/**
Expand All @@ -181,21 +207,27 @@ public boolean isAlive() {
* please use {@link #checkValidState()}.
*/
@ForceInline
public void checkValidStateRaw() {
public final void checkValidStateRaw() {
if (owner != null && owner != Thread.currentThread()) {
throw WRONG_THREAD;
}
if (state < OPEN) {
if (!isAlive()) {
throw ALREADY_CLOSED;
}
}

void assertIsAccessibleByCurrentThread() {
if (owner != null && owner != Thread.currentThread()) {
throw wrongThread();
}
}

/**
* Checks that this session is still alive (see {@link #isAlive()}).
* @throws IllegalStateException if this session is already closed or if this is
* a confined session and this method is called outside of the owner thread.
*/
public void checkValidState() {
public final void checkValidState() {
try {
checkValidStateRaw();
} catch (ScopedMemoryAccess.ScopedAccessError error) {
Expand All @@ -204,7 +236,7 @@ public void checkValidState() {
}

@Override
protected Object clone() throws CloneNotSupportedException {
protected final Object clone() throws CloneNotSupportedException {
throw new CloneNotSupportedException();
}

Expand All @@ -217,12 +249,28 @@ public boolean isCloseable() {
* @throws IllegalStateException if this session is already closed or if this is
* a confined session and this method is called outside of the owner thread.
*/
public void close() {
public final void close() {
justClose();
resourceList.cleanup();
}

abstract void justClose();
void justClose() {
int prevState = (int) STATE.compareAndExchange(this, OPEN, CLOSING);
if (prevState < 0) {
throw alreadyClosed();
} else if (prevState != OPEN) {
throw alreadyAcquired(prevState);
}
boolean success = SCOPED_MEMORY_ACCESS.closeScope(this);
STATE.setVolatile(this, success ? CLOSED : OPEN);
if (!success) {
throw alreadyAcquired(1);
}
}

private int state() {
return (int)STATE.get(this);
}

public static MemorySessionImpl heapSession(Object ref) {
return new GlobalSession(ref);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,49 +48,6 @@ sealed class SharedSession extends MemorySessionImpl permits ImplicitSession {
super(null, new SharedResourceList());
}

@Override
@ForceInline
public void acquire0() {
int value;
do {
value = (int) STATE.getVolatile(this);
if (value < OPEN) {
//segment is not open!
throw alreadyClosed();
} else if (value == MAX_FORKS) {
//overflow
throw tooManyAcquires();
}
} while (!STATE.compareAndSet(this, value, value + 1));
}

@Override
@ForceInline
public void release0() {
int value;
do {
value = (int) STATE.getVolatile(this);
if (value <= OPEN) {
//cannot get here - we can't close segment twice
throw alreadyClosed();
}
} while (!STATE.compareAndSet(this, value, value - 1));
}

void justClose() {
int prevState = (int) STATE.compareAndExchange(this, OPEN, CLOSING);
if (prevState < 0) {
throw alreadyClosed();
} else if (prevState != OPEN) {
throw alreadyAcquired(prevState);
}
boolean success = SCOPED_MEMORY_ACCESS.closeScope(this);
STATE.setVolatile(this, success ? CLOSED : OPEN);
if (!success) {
throw alreadyAcquired(1);
}
}

/**
* A shared resource list; this implementation has to handle add vs. add races, as well as add vs. cleanup races.
*/
Expand Down
47 changes: 47 additions & 0 deletions test/jdk/java/foreign/TestMemorySession.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,15 @@
import jdk.internal.foreign.MemorySessionImpl;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import static org.testng.Assert.*;

import java.lang.management.MemoryPoolMXBean;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
Expand Down Expand Up @@ -201,6 +206,14 @@ public void testCloseEmptySharedSession() {
Arena.openShared().close();
}

@Test
public void testCloseConfinedLockInSameThread() {
Arena arena = Arena.openConfined();
Arena handle = Arena.openConfined();
keepAlive(handle.session(), arena.session());
handle.close();
}

@Test
public void testCloseConfinedLock() {
Arena arena = Arena.openConfined();
Expand Down Expand Up @@ -302,6 +315,40 @@ public void testConfinedSessionWithSharedDependency() {
root.close();
}


// This tests that close operations are visible to other threads even though the original
// session is confined as per the .isAlive() contract.
@Test
public void testConfinedSessionIsAliveObservableFromAnotherThread() throws InterruptedException {
var begin = Instant.now();
for (int i = 0; i < 1_000; i++) {
var latch = new CountDownLatch(2);
Thread otherThread;
try (var arena = Arena.openConfined()) {
otherThread = new Thread(() -> {
MemorySession session = arena.session();
assertTrue(session.isAlive());
latch.countDown();
try {
latch.await();
} catch (InterruptedException e) {
throw new AssertionError(e);
}
boolean alive = session.isAlive();
assertFalse(alive);
});
otherThread.start();
while (latch.getCount() == 2) {
Thread.onSpinWait();
}
} // <- arena.close() invoked here
latch.countDown();
otherThread.join();
}
var end = Instant.now();
System.out.println("Completed in " + Duration.between(begin, end));
}

private void waitSomeTime() {
try {
Thread.sleep(10);
Expand Down