diff --git a/src/java.base/share/classes/java/util/concurrent/Executors.java b/src/java.base/share/classes/java/util/concurrent/Executors.java index 60ba265a32b..ac7d5338c66 100644 --- a/src/java.base/share/classes/java/util/concurrent/Executors.java +++ b/src/java.base/share/classes/java/util/concurrent/Executors.java @@ -36,6 +36,7 @@ package java.util.concurrent; import static java.lang.ref.Reference.reachabilityFence; +import java.lang.ref.Cleaner.Cleanable; import java.security.AccessControlContext; import java.security.AccessControlException; import java.security.AccessController; @@ -46,6 +47,7 @@ import java.util.Collection; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import jdk.internal.javac.PreviewFeature; +import jdk.internal.ref.CleanerFactory; import sun.security.util.SecurityConstants; /** @@ -173,10 +175,7 @@ public class Executors { * @return the newly created single-threaded Executor */ public static ExecutorService newSingleThreadExecutor() { - return new FinalizableDelegatedExecutorService - (new ThreadPoolExecutor(1, 1, - 0L, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue())); + return newSingleThreadExecutor(defaultThreadFactory()); } /** @@ -192,7 +191,7 @@ public class Executors { * @throws NullPointerException if threadFactory is null */ public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { - return new FinalizableDelegatedExecutorService + return new AutoShutdownDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), @@ -759,7 +758,11 @@ public class Executors { e.execute(command); } finally { reachabilityFence(this); } } - public void shutdown() { e.shutdown(); } + public void shutdown() { + try { + e.shutdown(); + } finally { reachabilityFence(this); } + } public List shutdownNow() { try { return e.shutdownNow(); @@ -824,14 +827,28 @@ public class Executors { } } - private static class FinalizableDelegatedExecutorService + /** + * A DelegatedExecutorService that uses a Cleaner to shut down the underlying + * ExecutorService when the wrapper becomes phantom reachable. + */ + private static class AutoShutdownDelegatedExecutorService extends DelegatedExecutorService { - FinalizableDelegatedExecutorService(ExecutorService executor) { + private final Cleanable cleanable; + AutoShutdownDelegatedExecutorService(ExecutorService executor) { super(executor); + Runnable action = () -> { + if (!executor.isShutdown()) { + PrivilegedAction pa = () -> { executor.shutdown(); return null; }; + @SuppressWarnings("removal") + var ignore = AccessController.doPrivileged(pa); + } + }; + cleanable = CleanerFactory.cleaner().register(this, action); } - @SuppressWarnings("removal") - protected void finalize() { + @Override + public void shutdown() { super.shutdown(); + cleanable.clean(); // unregisters the cleanable } } diff --git a/test/jdk/java/util/concurrent/Executors/AutoShutdown.java b/test/jdk/java/util/concurrent/Executors/AutoShutdown.java index dcb8dfbae2a..2b7fa7b883f 100644 --- a/test/jdk/java/util/concurrent/Executors/AutoShutdown.java +++ b/test/jdk/java/util/concurrent/Executors/AutoShutdown.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2006, 2018, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2006, 2023, Oracle and/or its affiliates. All rights reserved. * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. * * This code is free software; you can redistribute it and/or modify it @@ -23,89 +23,114 @@ /* * @test - * @bug 6399443 - * @summary Check for auto-shutdown and gc of singleThreadExecutors - * @library /test/lib - * @run main/othervm/timeout=1000 AutoShutdown - * @author Martin Buchholz + * @bug 6399443 8302899 + * @summary Test that Executors.newSingleThreadExecutor wraps an ExecutorService that + * automatically shuts down and terminates when the wrapper is GC'ed + * @modules java.base/java.util.concurrent:+open + * @run junit AutoShutdown */ -import static java.util.concurrent.Executors.defaultThreadFactory; -import static java.util.concurrent.Executors.newSingleThreadExecutor; - -import static java.util.concurrent.TimeUnit.MILLISECONDS; - -import java.lang.ref.WeakReference; -import java.util.Arrays; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executor; +import java.lang.reflect.Field; +import java.time.Duration; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import jdk.test.lib.Utils; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Supplier; +import java.util.stream.Stream; +import java.util.stream.IntStream; -public class AutoShutdown { - static final long LONG_DELAY_MS = Utils.adjustTimeout(10_000); +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.Arguments; +import static org.junit.jupiter.api.Assertions.*; - static void await(CountDownLatch latch) throws InterruptedException { - if (!latch.await(LONG_DELAY_MS, MILLISECONDS)) - throw new AssertionError("timed out waiting for latch"); +class AutoShutdown { + + private static Stream> executors() { + return Stream.of( + () -> Executors.newSingleThreadExecutor(), + () -> Executors.newSingleThreadExecutor(Executors.defaultThreadFactory()) + ); } - private static void realMain(String[] args) throws Throwable { - final Executor[] executors = { - newSingleThreadExecutor(), - newSingleThreadExecutor(defaultThreadFactory()), - // TODO: should these executors also auto-shutdown? - //newFixedThreadPool(1), - //newSingleThreadScheduledExecutor(), - //newSingleThreadScheduledExecutor(defaultThreadFactory()), - }; - final ConcurrentLinkedQueue> poolThreads - = new ConcurrentLinkedQueue<>(); - final CountDownLatch threadStarted - = new CountDownLatch(executors.length); - final CountDownLatch pleaseProceed - = new CountDownLatch(1); - Runnable task = new Runnable() { public void run() { - try { - poolThreads.add(new WeakReference<>(Thread.currentThread())); - threadStarted.countDown(); - await(pleaseProceed); - } catch (Throwable t) { unexpected(t); } - }}; - for (Executor executor : executors) - executor.execute(task); - await(threadStarted); - pleaseProceed.countDown(); - Arrays.fill(executors, null); // make executors unreachable - boolean done = false; - for (long timeout = 1L; !done && timeout <= 128L; timeout *= 2) { - System.gc(); - done = true; - for (WeakReference ref : poolThreads) { - Thread thread = ref.get(); - if (thread != null) { - TimeUnit.SECONDS.timedJoin(thread, timeout); - if (thread.isAlive()) - done = false; - } - } + private static Stream executorAndQueuedTaskCounts() { + int[] queuedTaskCounts = { 0, 1, 2 }; + return executors().flatMap(s -> IntStream.of(queuedTaskCounts) + .mapToObj(i -> Arguments.of(s, i))); + } + + /** + * SingleThreadExecutor with no worker threads. + */ + @ParameterizedTest + @MethodSource("executors") + void testNoWorker(Supplier supplier) throws Exception { + ExecutorService executor = supplier.get(); + ExecutorService delegate = getDelegate(executor); + executor = null; + gcAndAwaitTermination(delegate); + } + + /** + * SingleThreadExecutor with an idle worker thread. + */ + @ParameterizedTest + @MethodSource("executors") + void testIdleWorker(Supplier supplier) throws Exception { + ExecutorService executor = supplier.get(); + // submit a task to get a worker to start + executor.submit(() -> null).get(); + ExecutorService delegate = getDelegate(executor); + executor = null; + gcAndAwaitTermination(delegate); + } + + /** + * SingleThreadExecutor with an active worker and queued tasks. + */ + @ParameterizedTest + @MethodSource("executorAndQueuedTaskCounts") + void testActiveWorker(Supplier supplier,int queuedTaskCount) throws Exception { + ExecutorService executor = supplier.get(); + // the worker will execute one task, the other tasks will be queued + int ntasks = 1 + queuedTaskCount; + AtomicInteger completedTaskCount = new AtomicInteger(); + for (int i = 0; i < ntasks; i++) { + executor.submit(() -> { + Thread.sleep(Duration.ofMillis(500)); + completedTaskCount.incrementAndGet(); + return null; + }); } - if (!done) - throw new AssertionError("pool threads did not terminate"); + ExecutorService delegate = getDelegate(executor); + executor = null; + gcAndAwaitTermination(delegate); + assertEquals(ntasks, completedTaskCount.get()); } - //--------------------- Infrastructure --------------------------- - static volatile int passed = 0, failed = 0; - static void pass() {passed++;} - static void fail() {failed++; Thread.dumpStack();} - static void fail(String msg) {System.out.println(msg); fail();} - static void unexpected(Throwable t) {failed++; t.printStackTrace();} - static void equal(Object x, Object y) { - if (x == null ? y == null : x.equals(y)) pass(); - else fail(x + " not equal to " + y);} - public static void main(String[] args) throws Throwable { - try {realMain(args);} catch (Throwable t) {unexpected(t);} - System.out.printf("%nPassed = %d, failed = %d%n%n", passed, failed); - if (failed > 0) throw new AssertionError("Some tests failed");} + /** + * Returns the delegate for the given ExecutorService. The given ExecutorService + * must be a Executors$DelegatedExecutorService. + */ + private ExecutorService getDelegate(ExecutorService executor) throws Exception { + Field eField = Class.forName("java.util.concurrent.Executors$DelegatedExecutorService") + .getDeclaredField("e"); + eField.setAccessible(true); + return (ExecutorService) eField.get(executor); + } + + /** + * Invokes System.gc and waits for the given ExecutorService to terminate. + */ + private void gcAndAwaitTermination(ExecutorService executor) throws Exception { + System.err.println(executor); + boolean terminated = false; + while (!terminated) { + System.gc(); + terminated = executor.awaitTermination(100, TimeUnit.MILLISECONDS); + } + } } +