8347274: Gatherers.mapConcurrent exhibits undesired behavior under variable delays, interruption, and finishing

Reviewed-by: alanb
This commit is contained in:
Viktor Klang 2025-01-13 10:38:02 +00:00
parent 82e2a79122
commit 450636ae28
2 changed files with 136 additions and 59 deletions

View file

@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, 2024, Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2023, 2025, Oracle and/or its affiliates. All rights reserved.
* DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
* This code is free software; you can redistribute it and/or modify it
@ -30,6 +30,7 @@ import jdk.internal.vm.annotation.ForceInline;
import java.util.ArrayDeque;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
@ -350,86 +351,103 @@ public final class Gatherers {
final int maxConcurrency,
final Function<? super T, ? extends R> mapper) {
if (maxConcurrency < 1)
throw new IllegalArgumentException(
"'maxConcurrency' must be greater than 0");
throw new IllegalArgumentException("'maxConcurrency' must be greater than 0");
Objects.requireNonNull(mapper, "'mapper' must not be null");
class State {
// ArrayDeque default initial size is 16
final ArrayDeque<Future<R>> window =
new ArrayDeque<>(Math.min(maxConcurrency, 16));
final Semaphore windowLock = new Semaphore(maxConcurrency);
final class MapConcurrentTask extends FutureTask<R> {
final Thread thread;
private MapConcurrentTask(Callable<R> callable) {
super(callable);
this.thread = Thread.ofVirtual().unstarted(this);
}
}
final boolean integrate(T element,
Downstream<? super R> downstream) {
if (!downstream.isRejecting())
createTaskFor(element);
return flush(0, downstream);
final class State {
private final ArrayDeque<MapConcurrentTask> wip =
new ArrayDeque<>(Math.min(maxConcurrency, 16));
boolean integrate(T element, Downstream<? super R> downstream) {
// Prepare the next task and add it to the work-in-progress
final var task = new MapConcurrentTask(() -> mapper.apply(element));
wip.addLast(task);
assert wip.peekLast() == task;
assert wip.size() <= maxConcurrency;
// Start the next task
task.thread.start();
// Flush at least 1 element if we're at capacity
return flush(wip.size() < maxConcurrency ? 0 : 1, downstream);
}
final void createTaskFor(T element) {
windowLock.acquireUninterruptibly();
var task = new FutureTask<R>(() -> {
try {
return mapper.apply(element);
} finally {
windowLock.release();
}
});
var wasAddedToWindow = window.add(task);
assert wasAddedToWindow;
Thread.startVirtualThread(task);
}
final boolean flush(long atLeastN,
Downstream<? super R> downstream) {
boolean proceed = !downstream.isRejecting();
boolean interrupted = false;
boolean flush(long atLeastN, Downstream<? super R> downstream) {
boolean success = false, interrupted = false;
try {
Future<R> current;
while (proceed
&& (current = window.peek()) != null
&& (current.isDone() || atLeastN > 0)) {
proceed &= downstream.push(current.get());
boolean proceed = !downstream.isRejecting();
MapConcurrentTask current;
while (
proceed
&& (current = wip.peekFirst()) != null
&& (current.isDone() || atLeastN > 0)
) {
R result;
// Ensure that the task is done before proceeding
for (;;) {
try {
result = current.get();
break;
} catch (InterruptedException ie) {
interrupted = true; // ignore for now, and restore later
}
}
proceed &= downstream.push(result);
atLeastN -= 1;
var correctRemoval = window.pop() == current;
final var correctRemoval = wip.pollFirst() == current;
assert correctRemoval;
}
} catch(InterruptedException ie) {
proceed = false;
interrupted = true;
return (success = proceed); // Ensure that cleanup occurs if needed
} catch (ExecutionException e) {
proceed = false; // Ensure cleanup
final var cause = e.getCause();
throw (cause instanceof RuntimeException re)
? re
: new RuntimeException(cause == null ? e : cause);
} finally {
// Clean up
if (!proceed) {
Future<R> next;
while ((next = window.pollFirst()) != null) {
next.cancel(true);
// Clean up work-in-progress
if (!success && !wip.isEmpty()) {
// First signal cancellation for all tasks in progress
for (var task : wip)
task.cancel(true);
// Then wait for all in progress task Threads to exit
MapConcurrentTask next;
while ((next = wip.pollFirst()) != null) {
while (next.thread.isAlive()) {
try {
next.thread.join();
} catch (InterruptedException ie) {
interrupted = true; // ignore, for now, and restore later
}
}
}
}
// integrate(..) could be called from different threads each time
// so we need to restore the interrupt on the calling thread
if (interrupted)
Thread.currentThread().interrupt();
}
if (interrupted)
Thread.currentThread().interrupt();
return proceed;
}
}
return Gatherer.ofSequential(
State::new,
Integrator.<State, T, R>ofGreedy(State::integrate),
(state, downstream) -> state.flush(Long.MAX_VALUE, downstream)
State::new,
Integrator.<State, T, R>ofGreedy(State::integrate),
(state, downstream) -> state.flush(Long.MAX_VALUE, downstream)
);
}