8329593: Drop adjustments to target parallelism when virtual threads do I/O on files opened for buffered I/O

Reviewed-by: bpb, jpai
This commit is contained in:
Alan Bateman 2024-04-23 16:10:13 +00:00
parent b07e1531b3
commit 412e306d81
28 changed files with 461 additions and 870 deletions

View file

@ -1,5 +1,5 @@
/*
* Copyright (c) 2003, 2023, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2003, 2024, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -207,11 +207,11 @@ public final class FileDescriptor {
* @since 1.1
*/
public void sync() throws SyncFailedException {
long comp = Blocker.begin();
boolean attempted = Blocker.begin();
try {
sync0();
} finally {
Blocker.end(comp);
Blocker.end(attempted);
}
}

View file

@ -1,5 +1,5 @@
/*
* Copyright (c) 1994, 2023, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 1994, 2024, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -27,7 +27,6 @@ package java.io;
import java.nio.channels.FileChannel;
import java.util.Arrays;
import jdk.internal.misc.Blocker;
import jdk.internal.util.ArraysSupport;
import sun.nio.ch.FileChannelImpl;
@ -210,12 +209,7 @@ public class FileInputStream extends InputStream
* @param name the name of the file
*/
private void open(String name) throws FileNotFoundException {
long comp = Blocker.begin();
try {
open0(name);
} finally {
Blocker.end(comp);
}
open0(name);
}
/**
@ -228,12 +222,7 @@ public class FileInputStream extends InputStream
*/
@Override
public int read() throws IOException {
long comp = Blocker.begin();
try {
return read0();
} finally {
Blocker.end(comp);
}
return read0();
}
private native int read0() throws IOException;
@ -260,12 +249,7 @@ public class FileInputStream extends InputStream
*/
@Override
public int read(byte[] b) throws IOException {
long comp = Blocker.begin();
try {
return readBytes(b, 0, b.length);
} finally {
Blocker.end(comp);
}
return readBytes(b, 0, b.length);
}
/**
@ -284,12 +268,7 @@ public class FileInputStream extends InputStream
*/
@Override
public int read(byte[] b, int off, int len) throws IOException {
long comp = Blocker.begin();
try {
return readBytes(b, off, len);
} finally {
Blocker.end(comp);
}
return readBytes(b, off, len);
}
@Override
@ -396,22 +375,12 @@ public class FileInputStream extends InputStream
}
private long length() throws IOException {
long comp = Blocker.begin();
try {
return length0();
} finally {
Blocker.end(comp);
}
return length0();
}
private native long length0() throws IOException;
private long position() throws IOException {
long comp = Blocker.begin();
try {
return position0();
} finally {
Blocker.end(comp);
}
return position0();
}
private native long position0() throws IOException;
@ -441,12 +410,7 @@ public class FileInputStream extends InputStream
*/
@Override
public long skip(long n) throws IOException {
long comp = Blocker.begin();
try {
return skip0(n);
} finally {
Blocker.end(comp);
}
return skip0(n);
}
private native long skip0(long n) throws IOException;
@ -470,12 +434,7 @@ public class FileInputStream extends InputStream
*/
@Override
public int available() throws IOException {
long comp = Blocker.begin();
try {
return available0();
} finally {
Blocker.end(comp);
}
return available0();
}
private native int available0() throws IOException;
@ -566,8 +525,8 @@ public class FileInputStream extends InputStream
synchronized (this) {
fc = this.channel;
if (fc == null) {
this.channel = fc = FileChannelImpl.open(fd, path, true,
false, false, this);
fc = FileChannelImpl.open(fd, path, true, false, false, false, this);
this.channel = fc;
if (closed) {
try {
// possible race with close(), benign since

View file

@ -1,5 +1,5 @@
/*
* Copyright (c) 1994, 2023, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 1994, 2024, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -28,7 +28,6 @@ package java.io;
import java.nio.channels.FileChannel;
import jdk.internal.access.SharedSecrets;
import jdk.internal.access.JavaIOFileDescriptorAccess;
import jdk.internal.misc.Blocker;
import sun.nio.ch.FileChannelImpl;
@ -286,12 +285,7 @@ public class FileOutputStream extends OutputStream
* @param append whether the file is to be opened in append mode
*/
private void open(String name, boolean append) throws FileNotFoundException {
long comp = Blocker.begin();
try {
open0(name, append);
} finally {
Blocker.end(comp);
}
open0(name, append);
}
/**
@ -313,12 +307,7 @@ public class FileOutputStream extends OutputStream
@Override
public void write(int b) throws IOException {
boolean append = FD_ACCESS.getAppend(fd);
long comp = Blocker.begin();
try {
write(b, append);
} finally {
Blocker.end(comp);
}
write(b, append);
}
/**
@ -343,12 +332,7 @@ public class FileOutputStream extends OutputStream
@Override
public void write(byte[] b) throws IOException {
boolean append = FD_ACCESS.getAppend(fd);
long comp = Blocker.begin();
try {
writeBytes(b, 0, b.length, append);
} finally {
Blocker.end(comp);
}
writeBytes(b, 0, b.length, append);
}
/**
@ -364,12 +348,7 @@ public class FileOutputStream extends OutputStream
@Override
public void write(byte[] b, int off, int len) throws IOException {
boolean append = FD_ACCESS.getAppend(fd);
long comp = Blocker.begin();
try {
writeBytes(b, off, len, append);
} finally {
Blocker.end(comp);
}
writeBytes(b, off, len, append);
}
/**
@ -460,8 +439,8 @@ public class FileOutputStream extends OutputStream
synchronized (this) {
fc = this.channel;
if (fc == null) {
this.channel = fc = FileChannelImpl.open(fd, path, false,
true, false, this);
fc = FileChannelImpl.open(fd, path, false, true, false, false, this);
this.channel = fc;
if (closed) {
try {
// possible race with close(), benign since

View file

@ -71,6 +71,7 @@ public class RandomAccessFile implements DataOutput, DataInput, Closeable {
private final FileDescriptor fd;
private final boolean rw;
private final boolean sync; // O_SYNC or O_DSYNC
/**
* The path of the referenced file
@ -229,21 +230,25 @@ public class RandomAccessFile implements DataOutput, DataInput, Closeable {
int imode = -1;
boolean rw = false;
boolean sync = false;
if (mode.equals("r"))
imode = O_RDONLY;
else if (mode.startsWith("rw")) {
imode = O_RDWR;
rw = true;
if (mode.length() > 2) {
if (mode.equals("rws"))
if (mode.equals("rws")) {
imode |= O_SYNC;
else if (mode.equals("rwd"))
sync = true;
} else if (mode.equals("rwd")) {
imode |= O_DSYNC;
else
sync = true;
} else
imode = -1;
}
}
this.rw = rw;
this.sync = sync;
if (openAndDelete)
imode |= O_TEMPORARY;
@ -308,8 +313,8 @@ public class RandomAccessFile implements DataOutput, DataInput, Closeable {
synchronized (this) {
fc = this.channel;
if (fc == null) {
this.channel = fc = FileChannelImpl.open(fd, path, true,
rw, false, this);
fc = FileChannelImpl.open(fd, path, true, rw, sync, false, this);
this.channel = fc;
if (closed) {
try {
fc.close();
@ -350,12 +355,7 @@ public class RandomAccessFile implements DataOutput, DataInput, Closeable {
* defined above
*/
private void open(String name, int mode) throws FileNotFoundException {
long comp = Blocker.begin();
try {
open0(name, mode);
} finally {
Blocker.end(comp);
}
open0(name, mode);
}
// 'Read' primitives
@ -376,12 +376,7 @@ public class RandomAccessFile implements DataOutput, DataInput, Closeable {
* end-of-file has been reached.
*/
public int read() throws IOException {
long comp = Blocker.begin();
try {
return read0();
} finally {
Blocker.end(comp);
}
return read0();
}
private native int read0() throws IOException;
@ -394,12 +389,7 @@ public class RandomAccessFile implements DataOutput, DataInput, Closeable {
* @throws IOException If an I/O error has occurred.
*/
private int readBytes(byte[] b, int off, int len) throws IOException {
long comp = Blocker.begin();
try {
return readBytes0(b, off, len);
} finally {
Blocker.end(comp);
}
return readBytes0(b, off, len);
}
private native int readBytes0(byte[] b, int off, int len) throws IOException;
@ -547,11 +537,11 @@ public class RandomAccessFile implements DataOutput, DataInput, Closeable {
* @throws IOException if an I/O error occurs.
*/
public void write(int b) throws IOException {
long comp = Blocker.begin();
boolean attempted = Blocker.begin(sync);
try {
write0(b);
} finally {
Blocker.end(comp);
Blocker.end(attempted);
}
}
@ -566,11 +556,11 @@ public class RandomAccessFile implements DataOutput, DataInput, Closeable {
* @throws IOException If an I/O error has occurred.
*/
private void writeBytes(byte[] b, int off, int len) throws IOException {
long comp = Blocker.begin();
boolean attempted = Blocker.begin(sync);
try {
writeBytes0(b, off, len);
} finally {
Blocker.end(comp);
Blocker.end(attempted);
}
}
@ -630,12 +620,7 @@ public class RandomAccessFile implements DataOutput, DataInput, Closeable {
if (pos < 0) {
throw new IOException("Negative seek offset");
}
long comp = Blocker.begin();
try {
seek0(pos);
} finally {
Blocker.end(comp);
}
seek0(pos);
}
private native void seek0(long pos) throws IOException;
@ -647,12 +632,7 @@ public class RandomAccessFile implements DataOutput, DataInput, Closeable {
* @throws IOException if an I/O error occurs.
*/
public long length() throws IOException {
long comp = Blocker.begin();
try {
return length0();
} finally {
Blocker.end(comp);
}
return length0();
}
private native long length0() throws IOException;
@ -684,12 +664,7 @@ public class RandomAccessFile implements DataOutput, DataInput, Closeable {
* @since 1.2
*/
public void setLength(long newLength) throws IOException {
long comp = Blocker.begin();
try {
setLength0(newLength);
} finally {
Blocker.end(comp);
}
setLength0(newLength);
}
private native void setLength0(long newLength) throws IOException;

View file

@ -1,5 +1,5 @@
/*
* Copyright (c) 1994, 2023, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 1994, 2024, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -370,16 +370,21 @@ public class Object {
* @see #wait(long, int)
*/
public final void wait(long timeoutMillis) throws InterruptedException {
long comp = Blocker.begin();
if (!Thread.currentThread().isVirtual()) {
wait0(timeoutMillis);
return;
}
// virtual thread waiting
boolean attempted = Blocker.begin();
try {
wait0(timeoutMillis);
} catch (InterruptedException e) {
Thread thread = Thread.currentThread();
if (thread.isVirtual())
thread.getAndClearInterrupt();
// virtual thread's interrupt status needs to be cleared
Thread.currentThread().getAndClearInterrupt();
throw e;
} finally {
Blocker.end(comp);
Blocker.end(attempted);
}
}

View file

@ -1,5 +1,5 @@
/*
* Copyright (c) 1995, 2021, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 1995, 2024, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -25,6 +25,7 @@
package java.lang;
import jdk.internal.misc.Blocker;
import jdk.internal.util.StaticProperty;
import java.io.*;
@ -839,6 +840,75 @@ public abstract class Process {
return n - remaining;
}
@Override
public int read() throws IOException {
boolean attempted = Blocker.begin();
try {
return super.read();
} finally {
Blocker.end(attempted);
}
}
@Override
public int read(byte[] b) throws IOException {
boolean attempted = Blocker.begin();
try {
return super.read(b);
} finally {
Blocker.end(attempted);
}
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
boolean attempted = Blocker.begin();
try {
return super.read(b, off, len);
} finally {
Blocker.end(attempted);
}
}
}
/**
* An output stream for a subprocess pipe.
*/
static class PipeOutputStream extends FileOutputStream {
PipeOutputStream(FileDescriptor fd) {
super(fd);
}
@Override
public void write(int b) throws IOException {
boolean attempted = Blocker.begin();
try {
super.write(b);
} finally {
Blocker.end(attempted);
}
}
@Override
public void write(byte[] b) throws IOException {
boolean attempted = Blocker.begin();
try {
super.write(b);
} finally {
Blocker.end(attempted);
}
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
boolean attempted = Blocker.begin();
try {
super.write(b, off, len);
} finally {
Blocker.end(attempted);
}
}
}
/**

View file

@ -71,6 +71,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Stream;
import jdk.internal.logger.LoggerFinderLoader.TemporaryLoggerFinder;
import jdk.internal.misc.Blocker;
import jdk.internal.misc.CarrierThreadLocal;
import jdk.internal.misc.Unsafe;
import jdk.internal.util.StaticProperty;
@ -2190,9 +2191,9 @@ public final class System {
lineSeparator = props.getProperty("line.separator");
FileInputStream fdIn = new FileInputStream(FileDescriptor.in);
FileOutputStream fdOut = new FileOutputStream(FileDescriptor.out);
FileOutputStream fdErr = new FileOutputStream(FileDescriptor.err);
FileInputStream fdIn = new In(FileDescriptor.in);
FileOutputStream fdOut = new Out(FileDescriptor.out);
FileOutputStream fdErr = new Out(FileDescriptor.err);
initialIn = new BufferedInputStream(fdIn);
setIn0(initialIn);
// stdout/err.encoding are set when the VM is associated with the terminal,
@ -2217,6 +2218,83 @@ public final class System {
VM.initLevel(1);
}
/**
* System.in.
*/
private static class In extends FileInputStream {
In(FileDescriptor fd) {
super(fd);
}
@Override
public int read() throws IOException {
boolean attempted = Blocker.begin();
try {
return super.read();
} finally {
Blocker.end(attempted);
}
}
@Override
public int read(byte[] b) throws IOException {
boolean attempted = Blocker.begin();
try {
return super.read(b);
} finally {
Blocker.end(attempted);
}
}
@Override
public int read(byte[] b, int off, int len) throws IOException {
boolean attempted = Blocker.begin();
try {
return super.read(b, off, len);
} finally {
Blocker.end(attempted);
}
}
}
/**
* System.out/System.err wrap this output stream.
*/
private static class Out extends FileOutputStream {
Out(FileDescriptor fd) {
super(fd);
}
public void write(int b) throws IOException {
boolean attempted = Blocker.begin();
try {
super.write(b);
} finally {
Blocker.end(attempted);
}
}
@Override
public void write(byte[] b) throws IOException {
boolean attempted = Blocker.begin();
try {
super.write(b);
} finally {
Blocker.end(attempted);
}
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
boolean attempted = Blocker.begin();
try {
super.write(b, off, len);
} finally {
Blocker.end(attempted);
}
}
}
// @see #initPhase2()
static ModuleLayer bootLayer;

View file

@ -460,6 +460,11 @@ final class VirtualThread extends BaseVirtualThread {
private void afterYield() {
assert carrierThread == null;
// re-adjust parallelism if the virtual thread yielded when compensating
if (currentThread() instanceof CarrierThread ct) {
ct.endBlocking();
}
int s = state();
// LockSupport.park/parkNanos

View file

@ -1216,11 +1216,11 @@ public sealed class InetAddress implements Serializable permits Inet4Address, In
Objects.requireNonNull(policy);
validate(host);
InetAddress[] addrs;
long comp = Blocker.begin();
boolean attempted = Blocker.begin();
try {
addrs = impl.lookupAllHostAddr(host, policy);
} finally {
Blocker.end(comp);
Blocker.end(attempted);
}
return Arrays.stream(addrs);
}
@ -1230,11 +1230,11 @@ public sealed class InetAddress implements Serializable permits Inet4Address, In
if (addr.length != Inet4Address.INADDRSZ && addr.length != Inet6Address.INADDRSZ) {
throw new IllegalArgumentException("Invalid address length");
}
long comp = Blocker.begin();
boolean attempted = Blocker.begin();
try {
return impl.getHostByAddr(addr);
} finally {
Blocker.end(comp);
Blocker.end(attempted);
}
}
}

View file

@ -1,5 +1,5 @@
/*
* Copyright (c) 2020, 2022, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2020, 2024, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -98,13 +98,13 @@ import jdk.internal.misc.Unsafe;
long offset = mappingOffset(address, index);
long mappingAddress = mappingAddress(address, offset, index);
long mappingLength = mappingLength(offset, length);
long comp = Blocker.begin();
boolean attempted = Blocker.begin();
try {
force0(fd, mappingAddress, mappingLength);
} catch (IOException cause) {
throw new UncheckedIOException(cause);
} finally {
Blocker.end(comp);
Blocker.end(attempted);
}
}
}

View file

@ -1,5 +1,5 @@
/*
* Copyright (c) 2019, 2022, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2019, 2024, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -25,20 +25,18 @@
package jdk.internal.misc;
import java.util.concurrent.ForkJoinPool;
import jdk.internal.access.JavaLangAccess;
import jdk.internal.access.JavaUtilConcurrentFJPAccess;
import jdk.internal.access.SharedSecrets;
/**
* Defines static methods to mark the beginning and end of a possibly blocking
* operation. The methods are intended to be used with try-finally as follows:
* {@snippet lang=java :
* long comp = Blocker.begin();
* boolean attempted = Blocker.begin();
* try {
* // blocking operation
* } finally {
* Blocker.end(comp);
* Blocker.end(attempted);
* }
* }
* If invoked from a virtual thread and the underlying carrier thread is a
@ -62,64 +60,35 @@ public class Blocker {
}
/**
* Marks the beginning of a possibly blocking operation.
* @return the return value from the attempt to compensate or -1 if not attempted
* Marks the beginning of a blocking operation.
* @return true if tryCompensate attempted
*/
public static long begin() {
public static boolean begin() {
if (VM.isBooted()
&& currentCarrierThread() instanceof CarrierThread ct && !ct.inBlocking()) {
ct.beginBlocking();
boolean completed = false;
try {
long comp = ForkJoinPools.beginCompensatedBlock(ct.getPool());
assert currentCarrierThread() == ct;
completed = true;
return comp;
} finally {
if (!completed) {
ct.endBlocking();
}
}
&& Thread.currentThread().isVirtual()
&& currentCarrierThread() instanceof CarrierThread ct) {
return ct.beginBlocking();
}
return -1;
return false;
}
/**
* Marks the beginning of a possibly blocking operation.
* @param blocking true if the operation may block, otherwise false
* @return the return value from the attempt to compensate, -1 if not attempted
* or blocking is false
* @return true if tryCompensate attempted
*/
public static long begin(boolean blocking) {
return (blocking) ? begin() : -1;
public static boolean begin(boolean blocking) {
return (blocking) ? begin() : false;
}
/**
* Marks the end of an operation that may have blocked.
* @param compensateReturn the value returned by the begin method
* @param attempted if tryCompensate attempted
*/
public static void end(long compensateReturn) {
if (compensateReturn >= 0) {
assert currentCarrierThread() instanceof CarrierThread ct && ct.inBlocking();
public static void end(boolean attempted) {
if (attempted) {
CarrierThread ct = (CarrierThread) currentCarrierThread();
ForkJoinPools.endCompensatedBlock(ct.getPool(), compensateReturn);
ct.endBlocking();
}
}
/**
* Defines static methods to invoke non-public ForkJoinPool methods via the
* shared secret support.
*/
private static class ForkJoinPools {
private static final JavaUtilConcurrentFJPAccess FJP_ACCESS =
SharedSecrets.getJavaUtilConcurrentFJPAccess();
static long beginCompensatedBlock(ForkJoinPool pool) {
return FJP_ACCESS.beginCompensatedBlock(pool);
}
static void endCompensatedBlock(ForkJoinPool pool, long post) {
FJP_ACCESS.endCompensatedBlock(pool, post);
}
}
}

View file

@ -1,5 +1,5 @@
/*
* Copyright (c) 2021, 2022, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2021, 2024, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -32,7 +32,9 @@ import java.security.ProtectionDomain;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinWorkerThread;
import jdk.internal.access.JavaLangAccess;
import jdk.internal.access.JavaUtilConcurrentFJPAccess;
import jdk.internal.access.SharedSecrets;
import jdk.internal.vm.Continuation;
/**
* A ForkJoinWorkerThread that can be used as a carrier thread.
@ -49,7 +51,14 @@ public class CarrierThread extends ForkJoinWorkerThread {
private static final long INHERITABLETHREADLOCALS;
private static final long INHERITEDACCESSCONTROLCONTEXT;
private boolean blocking; // true if in blocking op
// compensating state
private static final int NOT_COMPENSATING = 0;
private static final int COMPENSATE_IN_PROGRESS = 1;
private static final int COMPENSATING = 2;
private int compensating;
// FJP value to adjust release counts
private long compensateValue;
@SuppressWarnings("this-escape")
public CarrierThread(ForkJoinPool pool) {
@ -60,27 +69,44 @@ public class CarrierThread extends ForkJoinWorkerThread {
}
/**
* For use by {@link Blocker} to test if the thread is in a blocking operation.
* Mark the start of a blocking operation.
*/
boolean inBlocking() {
//assert JLA.currentCarrierThread() == this;
return blocking;
public boolean beginBlocking() {
assert Thread.currentThread().isVirtual() && JLA.currentCarrierThread() == this;
assert compensating == NOT_COMPENSATING || compensating == COMPENSATING;
if (compensating == NOT_COMPENSATING) {
// don't preempt when attempting to compensate
Continuation.pin();
try {
compensating = COMPENSATE_IN_PROGRESS;
// Uses FJP.tryCompensate to start or re-activate a spare thread
compensateValue = ForkJoinPools.beginCompensatedBlock(getPool());
compensating = COMPENSATING;
return true;
} catch (Throwable e) {
// exception starting spare thread
compensating = NOT_COMPENSATING;
throw e;
} finally {
Continuation.unpin();
}
} else {
return false;
}
}
/**
* For use by {@link Blocker} to mark the start of a blocking operation.
* Mark the end of a blocking operation.
*/
void beginBlocking() {
//assert JLA.currentCarrierThread() == this && !blocking;
blocking = true;
}
/**
* For use by {@link Blocker} to mark the end of a blocking operation.
*/
void endBlocking() {
//assert JLA.currentCarrierThread() == this && blocking;
blocking = false;
public void endBlocking() {
assert Thread.currentThread() == this || JLA.currentCarrierThread() == this;
if (compensating == COMPENSATING) {
ForkJoinPools.endCompensatedBlock(getPool(), compensateValue);
compensating = NOT_COMPENSATING;
compensateValue = 0;
}
}
@Override
@ -95,7 +121,7 @@ public class CarrierThread extends ForkJoinWorkerThread {
* The thread group for the carrier threads.
*/
@SuppressWarnings("removal")
private static final ThreadGroup carrierThreadGroup() {
private static ThreadGroup carrierThreadGroup() {
return AccessController.doPrivileged(new PrivilegedAction<ThreadGroup>() {
public ThreadGroup run() {
ThreadGroup group = JLA.currentCarrierThread().getThreadGroup();
@ -117,6 +143,21 @@ public class CarrierThread extends ForkJoinWorkerThread {
});
}
/**
* Defines static methods to invoke non-public ForkJoinPool methods via the
* shared secret support.
*/
private static class ForkJoinPools {
private static final JavaUtilConcurrentFJPAccess FJP_ACCESS =
SharedSecrets.getJavaUtilConcurrentFJPAccess();
static long beginCompensatedBlock(ForkJoinPool pool) {
return FJP_ACCESS.beginCompensatedBlock(pool);
}
static void endCompensatedBlock(ForkJoinPool pool, long post) {
FJP_ACCESS.endCompensatedBlock(pool, post);
}
}
static {
CONTEXTCLASSLOADER = U.objectFieldOffset(Thread.class,
"contextClassLoader");

View file

@ -78,6 +78,7 @@ public class FileChannelImpl
// File access mode (immutable)
private final boolean writable;
private final boolean readable;
private final boolean sync; // O_SYNC or O_DSYNC
// Required to prevent finalization of creating stream (immutable)
private final Closeable parent;
@ -122,12 +123,14 @@ public class FileChannelImpl
}
private FileChannelImpl(FileDescriptor fd, String path, boolean readable,
boolean writable, boolean direct, Closeable parent)
boolean writable, boolean sync, boolean direct,
Closeable parent)
{
this.fd = fd;
this.path = path;
this.readable = readable;
this.writable = writable;
this.sync = sync;
this.direct = direct;
this.parent = parent;
if (direct) {
@ -150,9 +153,9 @@ public class FileChannelImpl
// and RandomAccessFile::getChannel
public static FileChannel open(FileDescriptor fd, String path,
boolean readable, boolean writable,
boolean direct, Closeable parent)
boolean sync, boolean direct, Closeable parent)
{
return new FileChannelImpl(fd, path, readable, writable, direct, parent);
return new FileChannelImpl(fd, path, readable, writable, sync, direct, parent);
}
private void ensureOpen() throws IOException {
@ -230,11 +233,11 @@ public class FileChannelImpl
if (!isOpen())
return 0;
do {
long comp = Blocker.begin();
boolean attempted = Blocker.begin(direct);
try {
n = IOUtil.read(fd, dst, -1, direct, alignment, nd);
} finally {
Blocker.end(comp);
Blocker.end(attempted);
}
} while ((n == IOStatus.INTERRUPTED) && isOpen());
return IOStatus.normalize(n);
@ -265,11 +268,11 @@ public class FileChannelImpl
if (!isOpen())
return 0;
do {
long comp = Blocker.begin();
boolean attempted = Blocker.begin(direct);
try {
n = IOUtil.read(fd, dsts, offset, length, direct, alignment, nd);
} finally {
Blocker.end(comp);
Blocker.end(attempted);
}
} while ((n == IOStatus.INTERRUPTED) && isOpen());
@ -298,11 +301,11 @@ public class FileChannelImpl
if (!isOpen())
return 0;
do {
long comp = Blocker.begin();
boolean attempted = Blocker.begin(sync || direct);
try {
n = IOUtil.write(fd, src, -1, direct, alignment, nd);
} finally {
Blocker.end(comp);
Blocker.end(attempted);
}
} while ((n == IOStatus.INTERRUPTED) && isOpen());
@ -334,11 +337,11 @@ public class FileChannelImpl
if (!isOpen())
return 0;
do {
long comp = Blocker.begin();
boolean attempted = Blocker.begin(sync || direct);
try {
n = IOUtil.write(fd, srcs, offset, length, direct, alignment, nd);
} finally {
Blocker.end(comp);
Blocker.end(attempted);
}
} while ((n == IOStatus.INTERRUPTED) && isOpen());
return IOStatus.normalize(n);
@ -365,13 +368,8 @@ public class FileChannelImpl
return 0;
boolean append = fdAccess.getAppend(fd);
do {
long comp = Blocker.begin();
try {
// in append-mode then position is advanced to end before writing
p = (append) ? nd.size(fd) : nd.seek(fd, -1);
} finally {
Blocker.end(comp);
}
// in append-mode then position is advanced to end before writing
p = (append) ? nd.size(fd) : nd.seek(fd, -1);
} while ((p == IOStatus.INTERRUPTED) && isOpen());
return IOStatus.normalize(p);
} finally {
@ -396,12 +394,7 @@ public class FileChannelImpl
if (!isOpen())
return null;
do {
long comp = Blocker.begin();
try {
p = nd.seek(fd, newPosition);
} finally {
Blocker.end(comp);
}
p = nd.seek(fd, newPosition);
} while ((p == IOStatus.INTERRUPTED) && isOpen());
return this;
} finally {
@ -424,12 +417,7 @@ public class FileChannelImpl
if (!isOpen())
return -1;
do {
long comp = Blocker.begin();
try {
s = nd.size(fd);
} finally {
Blocker.end(comp);
}
s = nd.size(fd);
} while ((s == IOStatus.INTERRUPTED) && isOpen());
return IOStatus.normalize(s);
} finally {
@ -461,24 +449,14 @@ public class FileChannelImpl
// get current size
long size;
do {
long comp = Blocker.begin();
try {
size = nd.size(fd);
} finally {
Blocker.end(comp);
}
size = nd.size(fd);
} while ((size == IOStatus.INTERRUPTED) && isOpen());
if (!isOpen())
return null;
// get current position
do {
long comp = Blocker.begin();
try {
p = nd.seek(fd, -1);
} finally {
Blocker.end(comp);
}
p = nd.seek(fd, -1);
} while ((p == IOStatus.INTERRUPTED) && isOpen());
if (!isOpen())
return null;
@ -487,12 +465,7 @@ public class FileChannelImpl
// truncate file if given size is less than the current size
if (newSize < size) {
do {
long comp = Blocker.begin();
try {
rv = nd.truncate(fd, newSize);
} finally {
Blocker.end(comp);
}
rv = nd.truncate(fd, newSize);
} while ((rv == IOStatus.INTERRUPTED) && isOpen());
if (!isOpen())
return null;
@ -502,12 +475,7 @@ public class FileChannelImpl
if (p > newSize)
p = newSize;
do {
long comp = Blocker.begin();
try {
rp = nd.seek(fd, p);
} finally {
Blocker.end(comp);
}
rp = nd.seek(fd, p);
} while ((rp == IOStatus.INTERRUPTED) && isOpen());
return this;
} finally {
@ -529,11 +497,11 @@ public class FileChannelImpl
if (!isOpen())
return;
do {
long comp = Blocker.begin();
boolean attempted = Blocker.begin();
try {
rv = nd.force(fd, metaData);
} finally {
Blocker.end(comp);
Blocker.end(attempted);
}
} while ((rv == IOStatus.INTERRUPTED) && isOpen());
} finally {
@ -624,12 +592,7 @@ public class FileChannelImpl
long n;
boolean append = fdAccess.getAppend(targetFD);
do {
long comp = Blocker.begin();
try {
n = nd.transferTo(fd, position, count, targetFD, append);
} finally {
Blocker.end(comp);
}
n = nd.transferTo(fd, position, count, targetFD, append);
} while ((n == IOStatus.INTERRUPTED) && isOpen());
return n;
}
@ -895,12 +858,7 @@ public class FileChannelImpl
long n;
boolean append = fdAccess.getAppend(fd);
do {
long comp = Blocker.begin();
try {
n = nd.transferFrom(srcFD, fd, position, count, append);
} finally {
Blocker.end(comp);
}
n = nd.transferFrom(srcFD, fd, position, count, append);
} while ((n == IOStatus.INTERRUPTED) && isOpen());
return n;
}
@ -1088,11 +1046,11 @@ public class FileChannelImpl
if (!isOpen())
return -1;
do {
long comp = Blocker.begin();
boolean attempted = Blocker.begin(direct);
try {
n = IOUtil.read(fd, dst, position, direct, alignment, nd);
} finally {
Blocker.end(comp);
Blocker.end(attempted);
}
} while ((n == IOStatus.INTERRUPTED) && isOpen());
return IOStatus.normalize(n);
@ -1133,11 +1091,11 @@ public class FileChannelImpl
if (!isOpen())
return -1;
do {
long comp = Blocker.begin();
boolean attempted = Blocker.begin(sync || direct);
try {
n = IOUtil.write(fd, src, position, direct, alignment, nd);
} finally {
Blocker.end(comp);
Blocker.end(attempted);
}
} while ((n == IOStatus.INTERRUPTED) && isOpen());
return IOStatus.normalize(n);
@ -1362,12 +1320,7 @@ public class FileChannelImpl
synchronized (positionLock) {
long filesize;
do {
long comp = Blocker.begin();
try {
filesize = nd.size(fd);
} finally {
Blocker.end(comp);
}
filesize = nd.size(fd);
} while ((filesize == IOStatus.INTERRUPTED) && isOpen());
if (!isOpen())
return null;
@ -1379,12 +1332,7 @@ public class FileChannelImpl
}
int rv;
do {
long comp = Blocker.begin();
try {
rv = nd.truncate(fd, position + size);
} finally {
Blocker.end(comp);
}
rv = nd.truncate(fd, position + size);
} while ((rv == IOStatus.INTERRUPTED) && isOpen());
if (!isOpen())
return null;
@ -1575,11 +1523,11 @@ public class FileChannelImpl
return null;
int n;
do {
long comp = Blocker.begin();
boolean attempted = Blocker.begin();
try {
n = nd.lock(fd, true, position, size, shared);
} finally {
Blocker.end(comp);
Blocker.end(attempted);
}
} while ((n == FileDispatcher.INTERRUPTED) && isOpen());
if (isOpen()) {