8343394: Make MemorySessionImpl.state a stable field

Co-authored-by: Maurizio Cimadamore <mcimadamore@openjdk.org>
Reviewed-by: mcimadamore, jvernee
This commit is contained in:
Quan Anh Mai 2024-11-07 14:32:18 +00:00
parent d2b681d455
commit 1d117f65f0
7 changed files with 250 additions and 43 deletions

View file

@ -41,8 +41,7 @@ final class ConfinedSession extends MemorySessionImpl {
private int asyncReleaseCount = 0; private int asyncReleaseCount = 0;
static final VarHandle ASYNC_RELEASE_COUNT= MhUtil.findVarHandle( static final VarHandle ASYNC_RELEASE_COUNT= MhUtil.findVarHandle(MethodHandles.lookup(), "asyncReleaseCount", int.class);
MethodHandles.lookup(), "asyncReleaseCount", int.class);
public ConfinedSession(Thread owner) { public ConfinedSession(Thread owner) {
super(owner, new ConfinedResourceList()); super(owner, new ConfinedResourceList());
@ -52,17 +51,17 @@ final class ConfinedSession extends MemorySessionImpl {
@ForceInline @ForceInline
public void acquire0() { public void acquire0() {
checkValidState(); checkValidState();
if (state == MAX_FORKS) { if (acquireCount == MAX_FORKS) {
throw tooManyAcquires(); throw tooManyAcquires();
} }
state++; acquireCount++;
} }
@Override @Override
@ForceInline @ForceInline
public void release0() { public void release0() {
if (Thread.currentThread() == owner) { if (Thread.currentThread() == owner) {
state--; acquireCount--;
} else { } else {
// It is possible to end up here in two cases: this session was kept alive by some other confined session // It is possible to end up here in two cases: this session was kept alive by some other confined session
// which is implicitly released (in which case the release call comes from the cleaner thread). Or, // which is implicitly released (in which case the release call comes from the cleaner thread). Or,
@ -75,11 +74,11 @@ final class ConfinedSession extends MemorySessionImpl {
void justClose() { void justClose() {
checkValidState(); checkValidState();
int asyncCount = (int)ASYNC_RELEASE_COUNT.getVolatile(this); int asyncCount = (int)ASYNC_RELEASE_COUNT.getVolatile(this);
if ((state == 0 && asyncCount == 0) int acquire = acquireCount - asyncCount;
|| ((state - asyncCount) == 0)) { if (acquire == 0) {
state = CLOSED; state = CLOSED;
} else { } else {
throw alreadyAcquired(state - asyncCount); throw alreadyAcquired(acquire);
} }
} }

View file

@ -42,6 +42,7 @@ non-sealed class GlobalSession extends MemorySessionImpl {
public GlobalSession() { public GlobalSession() {
super(null, null); super(null, null);
this.state = NONCLOSEABLE;
} }
@Override @Override
@ -50,11 +51,6 @@ non-sealed class GlobalSession extends MemorySessionImpl {
// do nothing // do nothing
} }
@Override
public boolean isCloseable() {
return false;
}
@Override @Override
@ForceInline @ForceInline
public void acquire0() { public void acquire0() {

View file

@ -42,6 +42,7 @@ final class ImplicitSession extends SharedSession {
public ImplicitSession(Cleaner cleaner) { public ImplicitSession(Cleaner cleaner) {
super(); super();
this.state = NONCLOSEABLE;
cleaner.register(this, resourceList); cleaner.register(this, resourceList);
} }
@ -55,11 +56,6 @@ final class ImplicitSession extends SharedSession {
// do nothing // do nothing
} }
@Override
public boolean isCloseable() {
return false;
}
@Override @Override
public void justClose() { public void justClose() {
throw nonCloseable(); throw nonCloseable();

View file

@ -38,6 +38,7 @@ import jdk.internal.foreign.GlobalSession.HeapSession;
import jdk.internal.misc.ScopedMemoryAccess; import jdk.internal.misc.ScopedMemoryAccess;
import jdk.internal.invoke.MhUtil; import jdk.internal.invoke.MhUtil;
import jdk.internal.vm.annotation.ForceInline; import jdk.internal.vm.annotation.ForceInline;
import jdk.internal.vm.annotation.Stable;
/** /**
* This class manages the temporal bounds associated with a memory segment as well * This class manages the temporal bounds associated with a memory segment as well
@ -55,11 +56,19 @@ import jdk.internal.vm.annotation.ForceInline;
public abstract sealed class MemorySessionImpl public abstract sealed class MemorySessionImpl
implements Scope implements Scope
permits ConfinedSession, GlobalSession, SharedSession { permits ConfinedSession, GlobalSession, SharedSession {
/**
* The value of the {@code state} of a {@code MemorySessionImpl}. The only possible transition
* is OPEN -> CLOSED. As a result, the states CLOSED and NONCLOSEABLE are stable. This allows
* us to annotate {@code state} with {@link Stable} and elide liveness check on non-closeable
* constant scopes, such as {@code GLOBAL_SESSION}.
*/
static final int OPEN = 0; static final int OPEN = 0;
static final int CLOSED = -1; static final int CLOSED = -1;
static final int NONCLOSEABLE = 1;
static final VarHandle STATE = MhUtil.findVarHandle( static final VarHandle STATE = MhUtil.findVarHandle(MethodHandles.lookup(), "state", int.class);
MethodHandles.lookup(), "state", int.class); static final VarHandle ACQUIRE_COUNT = MhUtil.findVarHandle(MethodHandles.lookup(), "acquireCount", int.class);
static final int MAX_FORKS = Integer.MAX_VALUE; static final int MAX_FORKS = Integer.MAX_VALUE;
@ -70,7 +79,11 @@ public abstract sealed class MemorySessionImpl
final ResourceList resourceList; final ResourceList resourceList;
final Thread owner; final Thread owner;
int state = OPEN;
@Stable
int state;
int acquireCount;
public Arena asArena() { public Arena asArena() {
return new ArenaImpl(this); return new ArenaImpl(this);
@ -214,8 +227,8 @@ public abstract sealed class MemorySessionImpl
throw new CloneNotSupportedException(); throw new CloneNotSupportedException();
} }
public boolean isCloseable() { public final boolean isCloseable() {
return true; return state <= OPEN;
} }
/** /**

View file

@ -44,6 +44,8 @@ sealed class SharedSession extends MemorySessionImpl permits ImplicitSession {
private static final ScopedMemoryAccess SCOPED_MEMORY_ACCESS = ScopedMemoryAccess.getScopedMemoryAccess(); private static final ScopedMemoryAccess SCOPED_MEMORY_ACCESS = ScopedMemoryAccess.getScopedMemoryAccess();
private static final int CLOSED_ACQUIRE_COUNT = -1;
SharedSession() { SharedSession() {
super(null, new SharedResourceList()); super(null, new SharedResourceList());
} }
@ -53,15 +55,15 @@ sealed class SharedSession extends MemorySessionImpl permits ImplicitSession {
public void acquire0() { public void acquire0() {
int value; int value;
do { do {
value = (int) STATE.getVolatile(this); value = (int) ACQUIRE_COUNT.getVolatile(this);
if (value < OPEN) { if (value < 0) {
//segment is not open! //segment is not open!
throw alreadyClosed(); throw sharedSessionAlreadyClosed();
} else if (value == MAX_FORKS) { } else if (value == MAX_FORKS) {
//overflow //overflow
throw tooManyAcquires(); throw tooManyAcquires();
} }
} while (!STATE.compareAndSet(this, value, value + 1)); } while (!ACQUIRE_COUNT.compareAndSet(this, value, value + 1));
} }
@Override @Override
@ -69,24 +71,35 @@ sealed class SharedSession extends MemorySessionImpl permits ImplicitSession {
public void release0() { public void release0() {
int value; int value;
do { do {
value = (int) STATE.getVolatile(this); value = (int) ACQUIRE_COUNT.getVolatile(this);
if (value <= OPEN) { if (value <= 0) {
//cannot get here - we can't close segment twice //cannot get here - we can't close segment twice
throw alreadyClosed(); throw sharedSessionAlreadyClosed();
} }
} while (!STATE.compareAndSet(this, value, value - 1)); } while (!ACQUIRE_COUNT.compareAndSet(this, value, value - 1));
} }
void justClose() { void justClose() {
int prevState = (int) STATE.compareAndExchange(this, OPEN, CLOSED); int acquireCount = (int) ACQUIRE_COUNT.compareAndExchange(this, 0, CLOSED_ACQUIRE_COUNT);
if (prevState < 0) { if (acquireCount < 0) {
throw alreadyClosed(); throw sharedSessionAlreadyClosed();
} else if (prevState != OPEN) { } else if (acquireCount > 0) {
throw alreadyAcquired(prevState); throw alreadyAcquired(acquireCount);
} }
STATE.setVolatile(this, CLOSED);
SCOPED_MEMORY_ACCESS.closeScope(this, ALREADY_CLOSED); SCOPED_MEMORY_ACCESS.closeScope(this, ALREADY_CLOSED);
} }
private IllegalStateException sharedSessionAlreadyClosed() {
// To avoid the situation where a scope fails to be acquired or closed but still reports as
// alive afterward, we wait for the state to change before throwing the exception
while ((int) STATE.getVolatile(this) == OPEN) {
Thread.onSpinWait();
}
return alreadyClosed();
}
/** /**
* A shared resource list; this implementation has to handle add vs. add races, as well as add vs. cleanup races. * A shared resource list; this implementation has to handle add vs. add races, as well as add vs. cleanup races.
*/ */

View file

@ -1,5 +1,5 @@
/* /*
* Copyright (c) 2021, 2023, Oracle and/or its affiliates. All rights reserved. * Copyright (c) 2021, 2024, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
* *
* This code is free software; you can redistribute it and/or modify it * This code is free software; you can redistribute it and/or modify it
@ -28,12 +28,6 @@
*/ */
import java.lang.foreign.Arena; import java.lang.foreign.Arena;
import jdk.internal.foreign.MemorySessionImpl;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import static org.testng.Assert.*;
import java.lang.reflect.Method; import java.lang.reflect.Method;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
@ -41,6 +35,11 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier; import java.util.function.Supplier;
import java.util.stream.IntStream; import java.util.stream.IntStream;
import jdk.internal.foreign.MemorySessionImpl;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
import static org.testng.Assert.*;
public class TestMemorySession { public class TestMemorySession {
@ -319,6 +318,70 @@ public class TestMemorySession {
assertEquals(sessionImpl.isCloseableBy(otherThread), isCloseableByOther); assertEquals(sessionImpl.isCloseableBy(otherThread), isCloseableByOther);
} }
/**
* Test that a thread failing to acquire a scope will not observe it as alive afterwards.
*/
@Test
public void testAcquireCloseRace() throws InterruptedException {
int iteration = 1000;
AtomicInteger lock = new AtomicInteger();
boolean[] result = new boolean[1];
lock.set(-2);
MemorySessionImpl[] scopes = new MemorySessionImpl[iteration];
for (int i = 0; i < iteration; i++) {
scopes[i] = MemorySessionImpl.toMemorySession(Arena.ofShared());
}
// This thread tries to close the scopes
Thread t1 = new Thread(() -> {
for (int i = 0; i < iteration; i++) {
MemorySessionImpl scope = scopes[i];
while (true) {
try {
scope.close();
break;
} catch (IllegalStateException e) {}
}
// Keep the 2 threads operating on the same scope
int k = lock.getAndAdd(1) + 1;
while (k != i * 2) {
Thread.onSpinWait();
k = lock.get();
}
}
});
// This thread tries to acquire the scopes, then check if it is alive after an acquire failure
Thread t2 = new Thread(() -> {
for (int i = 0; i < iteration; i++) {
MemorySessionImpl scope = scopes[i];
while (true) {
try {
scope.acquire0();
} catch (IllegalStateException e) {
if (scope.isAlive()) {
result[0] = true;
}
break;
}
scope.release0();
}
// Keep the 2 threads operating on the same scope
int k = lock.getAndAdd(1) + 1;
while (k != i * 2) {
Thread.onSpinWait();
k = lock.get();
}
}
});
t1.start();
t2.start();
t1.join();
t2.join();
assertFalse(result[0]);
}
private void waitSomeTime() { private void waitSomeTime() {
try { try {
Thread.sleep(10); Thread.sleep(10);

View file

@ -0,0 +1,127 @@
/*
* Copyright (c) 2024, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
* under the terms of the GNU General Public License version 2 only, as
* published by the Free Software Foundation.
*
* This code is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
* version 2 for more details (a copy is included in the LICENSE file that
* accompanied this code).
*
* You should have received a copy of the GNU General Public License version
* 2 along with this work; if not, write to the Free Software Foundation,
* Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA.
*
* Please contact Oracle, 500 Oracle Parkway, Redwood Shores, CA 94065 USA
* or visit www.oracle.com if you need additional information or have any
* questions.
*/
package org.openjdk.bench.java.lang.foreign;
import java.lang.foreign.Arena;
import java.lang.foreign.MemorySegment;
import java.lang.foreign.ValueLayout;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.CompilerControl;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.TearDown;
import sun.misc.Unsafe;
@BenchmarkMode(Mode.AverageTime)
@Warmup(iterations = 5, time = 500, timeUnit = TimeUnit.MILLISECONDS)
@Measurement(iterations = 10, time = 500, timeUnit = TimeUnit.MILLISECONDS)
@State(org.openjdk.jmh.annotations.Scope.Thread)
@OutputTimeUnit(TimeUnit.MICROSECONDS)
@Fork(value = 3, jvmArgsAppend = { "--enable-native-access=ALL-UNNAMED" })
public class LoopOverRandom extends JavaLayouts {
static final int SEED = 0;
static final long ELEM_SIZE = ValueLayout.JAVA_INT.byteSize();
static final int ELEM_COUNT = 1_000;
static final long ALLOC_SIZE = ELEM_COUNT * ELEM_SIZE;
static final Unsafe unsafe = Utils.unsafe;
Arena arena;
MemorySegment segment;
int[] indices;
static final MemorySegment ALL = MemorySegment.NULL.reinterpret(Long.MAX_VALUE);
@Setup
public void setup() {
indices = new Random(SEED).ints(0, ELEM_COUNT).limit(ELEM_COUNT).toArray();
arena = Arena.ofConfined();
segment = arena.allocate(ALLOC_SIZE);
for (int i = 0; i < ELEM_COUNT; i++) {
segment.setAtIndex(ValueLayout.JAVA_INT, i, i);
}
}
@TearDown
public void tearDown() {
arena.close();
}
@Benchmark
public long segment_loop() {
int sum = 0;
for (int i = 0; i < ELEM_COUNT; i++) {
sum += segment.getAtIndex(ValueLayout.JAVA_INT_UNALIGNED, indices[i]);
target_dontInline();
}
return sum;
}
@Benchmark
public long segment_loop_all() {
int sum = 0;
for (int i = 0; i < ELEM_COUNT; i++) {
sum += ALL.get(ValueLayout.JAVA_INT_UNALIGNED, segment.address() + indices[i] * ELEM_SIZE);
target_dontInline();
}
return sum;
}
@Benchmark
public long segment_loop_asUnchecked() {
int sum = 0;
for (int i = 0; i < ELEM_COUNT; i++) {
sum += asUnchecked(segment).getAtIndex(ValueLayout.JAVA_INT_UNALIGNED, indices[i]);
target_dontInline();
}
return sum;
}
@Benchmark
public long unsafe_loop() {
int sum = 0;
for (int i = 0; i < ELEM_COUNT; i++) {
sum += unsafe.getInt(segment.address() + indices[i] * ELEM_SIZE);
target_dontInline();
}
return sum;
}
MemorySegment asUnchecked(MemorySegment segment) {
return MemorySegment.ofAddress(segment.address()).reinterpret(Long.MAX_VALUE);
}
@CompilerControl(CompilerControl.Mode.DONT_INLINE)
public void target_dontInline() {
// this method was intentionally left blank
}
}