8234131: Miscellaneous changes imported from jsr166 CVS 2021-01

8257671: ThreadPoolExecutor.Discard*Policy: rejected tasks are not cancelled

Reviewed-by: alanb, prappo, dl
This commit is contained in:
Martin Buchholz 2021-01-09 21:59:27 +00:00
parent 63e3bd7613
commit 270014ab4e
41 changed files with 273 additions and 207 deletions

View file

@ -41,6 +41,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.locks.LockSupport;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.BiConsumer;
import java.util.function.BiPredicate;
import java.util.function.Consumer;
@ -175,11 +176,11 @@ public class SubmissionPublisher<T> implements Publisher<T>,
/*
* Most mechanics are handled by BufferedSubscription. This class
* mainly tracks subscribers and ensures sequentiality, by using
* built-in synchronization locks across public methods. Using
* built-in locks works well in the most typical case in which
* only one thread submits items. We extend this idea in
* submission methods by detecting single-ownership to reduce
* producer-consumer synchronization strength.
* locks across public methods, to ensure thread-safety in the
* presence of multiple sources and maintain acquire-release
* ordering around user operations. However, we also track whether
* there is only a single source, and if so streamline some buffer
* operations by avoiding some atomics.
*/
/** The largest possible power of two array size. */
@ -234,6 +235,8 @@ public class SubmissionPublisher<T> implements Publisher<T>,
*/
BufferedSubscription<T> clients;
/** Lock for exclusion across multiple sources */
final ReentrantLock lock;
/** Run status, updated only within locks */
volatile boolean closed;
/** Set true on first call to subscribe, to initialize possible owner */
@ -274,6 +277,7 @@ public class SubmissionPublisher<T> implements Publisher<T>,
throw new NullPointerException();
if (maxBufferCapacity <= 0)
throw new IllegalArgumentException("capacity must be positive");
this.lock = new ReentrantLock();
this.executor = executor;
this.onNextHandler = handler;
this.maxBufferCapacity = roundCapacity(maxBufferCapacity);
@ -337,13 +341,15 @@ public class SubmissionPublisher<T> implements Publisher<T>,
*/
public void subscribe(Subscriber<? super T> subscriber) {
if (subscriber == null) throw new NullPointerException();
ReentrantLock lock = this.lock;
int max = maxBufferCapacity; // allocate initial array
Object[] array = new Object[max < INITIAL_CAPACITY ?
max : INITIAL_CAPACITY];
BufferedSubscription<T> subscription =
new BufferedSubscription<T>(subscriber, executor, onNextHandler,
array, max);
synchronized (this) {
lock.lock();
try {
if (!subscribed) {
subscribed = true;
owner = Thread.currentThread();
@ -378,6 +384,8 @@ public class SubmissionPublisher<T> implements Publisher<T>,
pred = b;
b = next;
}
} finally {
lock.unlock();
}
}
@ -390,7 +398,9 @@ public class SubmissionPublisher<T> implements Publisher<T>,
if (item == null) throw new NullPointerException();
int lag = 0;
boolean complete, unowned;
synchronized (this) {
ReentrantLock lock = this.lock;
lock.lock();
try {
Thread t = Thread.currentThread(), o;
BufferedSubscription<T> b = clients;
if ((unowned = ((o = owner) != t)) && o != null)
@ -421,6 +431,8 @@ public class SubmissionPublisher<T> implements Publisher<T>,
if (retries != null || cleanMe)
lag = retryOffer(item, nanos, onDrop, retries, lag, cleanMe);
}
} finally {
lock.unlock();
}
if (complete)
throw new IllegalStateException("Closed");
@ -609,14 +621,18 @@ public class SubmissionPublisher<T> implements Publisher<T>,
* subscribers have yet completed.
*/
public void close() {
ReentrantLock lock = this.lock;
if (!closed) {
BufferedSubscription<T> b;
synchronized (this) {
lock.lock();
try {
// no need to re-check closed here
b = clients;
clients = null;
owner = null;
closed = true;
} finally {
lock.unlock();
}
while (b != null) {
BufferedSubscription<T> next = b.next;
@ -641,9 +657,11 @@ public class SubmissionPublisher<T> implements Publisher<T>,
public void closeExceptionally(Throwable error) {
if (error == null)
throw new NullPointerException();
ReentrantLock lock = this.lock;
if (!closed) {
BufferedSubscription<T> b;
synchronized (this) {
lock.lock();
try {
b = clients;
if (!closed) { // don't clobber racing close
closedException = error;
@ -651,6 +669,8 @@ public class SubmissionPublisher<T> implements Publisher<T>,
owner = null;
closed = true;
}
} finally {
lock.unlock();
}
while (b != null) {
BufferedSubscription<T> next = b.next;
@ -688,7 +708,9 @@ public class SubmissionPublisher<T> implements Publisher<T>,
*/
public boolean hasSubscribers() {
boolean nonEmpty = false;
synchronized (this) {
ReentrantLock lock = this.lock;
lock.lock();
try {
for (BufferedSubscription<T> b = clients; b != null;) {
BufferedSubscription<T> next = b.next;
if (b.isClosed()) {
@ -700,6 +722,8 @@ public class SubmissionPublisher<T> implements Publisher<T>,
break;
}
}
} finally {
lock.unlock();
}
return nonEmpty;
}
@ -710,9 +734,15 @@ public class SubmissionPublisher<T> implements Publisher<T>,
* @return the number of current subscribers
*/
public int getNumberOfSubscribers() {
synchronized (this) {
return cleanAndCount();
int n;
ReentrantLock lock = this.lock;
lock.lock();
try {
n = cleanAndCount();
} finally {
lock.unlock();
}
return n;
}
/**
@ -742,7 +772,9 @@ public class SubmissionPublisher<T> implements Publisher<T>,
*/
public List<Subscriber<? super T>> getSubscribers() {
ArrayList<Subscriber<? super T>> subs = new ArrayList<>();
synchronized (this) {
ReentrantLock lock = this.lock;
lock.lock();
try {
BufferedSubscription<T> pred = null, next;
for (BufferedSubscription<T> b = clients; b != null; b = next) {
next = b.next;
@ -758,6 +790,8 @@ public class SubmissionPublisher<T> implements Publisher<T>,
pred = b;
}
}
} finally {
lock.unlock();
}
return subs;
}
@ -771,8 +805,11 @@ public class SubmissionPublisher<T> implements Publisher<T>,
*/
public boolean isSubscribed(Subscriber<? super T> subscriber) {
if (subscriber == null) throw new NullPointerException();
boolean subscribed = false;
ReentrantLock lock = this.lock;
if (!closed) {
synchronized (this) {
lock.lock();
try {
BufferedSubscription<T> pred = null, next;
for (BufferedSubscription<T> b = clients; b != null; b = next) {
next = b.next;
@ -783,14 +820,16 @@ public class SubmissionPublisher<T> implements Publisher<T>,
else
pred.next = next;
}
else if (subscriber.equals(b.subscriber))
return true;
else if (subscribed = subscriber.equals(b.subscriber))
break;
else
pred = b;
}
} finally {
lock.unlock();
}
}
return false;
return subscribed;
}
/**
@ -803,7 +842,9 @@ public class SubmissionPublisher<T> implements Publisher<T>,
public long estimateMinimumDemand() {
long min = Long.MAX_VALUE;
boolean nonEmpty = false;
synchronized (this) {
ReentrantLock lock = this.lock;
lock.lock();
try {
BufferedSubscription<T> pred = null, next;
for (BufferedSubscription<T> b = clients; b != null; b = next) {
int n; long d;
@ -822,6 +863,8 @@ public class SubmissionPublisher<T> implements Publisher<T>,
pred = b;
}
}
} finally {
lock.unlock();
}
return nonEmpty ? min : 0;
}
@ -834,7 +877,9 @@ public class SubmissionPublisher<T> implements Publisher<T>,
*/
public int estimateMaximumLag() {
int max = 0;
synchronized (this) {
ReentrantLock lock = this.lock;
lock.lock();
try {
BufferedSubscription<T> pred = null, next;
for (BufferedSubscription<T> b = clients; b != null; b = next) {
int n;
@ -852,6 +897,8 @@ public class SubmissionPublisher<T> implements Publisher<T>,
pred = b;
}
}
} finally {
lock.unlock();
}
return max;
}