8087324: Use semaphores when starting and stopping GC task threads

Reviewed-by: jmasa, sjohanss
This commit is contained in:
Stefan Karlsson 2015-06-29 11:11:12 +02:00
parent e25bcfd3d3
commit 0e252b2a93
4 changed files with 249 additions and 240 deletions

View file

@ -28,6 +28,8 @@
#include "memory/allocation.inline.hpp"
#include "runtime/atomic.inline.hpp"
#include "runtime/os.hpp"
#include "runtime/semaphore.hpp"
#include "runtime/thread.inline.hpp"
// Definitions of WorkGang methods.
@ -96,87 +98,170 @@ void AbstractWorkGang::threads_do(ThreadClosure* tc) const {
}
}
WorkGang::WorkGang(const char* name,
uint workers,
bool are_GC_task_threads,
bool are_ConcurrentGC_threads) :
AbstractWorkGang(name, workers, are_GC_task_threads, are_ConcurrentGC_threads),
_started_workers(0),
_finished_workers(0),
_sequence_number(0),
_task(NULL) {
// WorkGang dispatcher implemented with semaphores.
//
// Semaphores don't require the worker threads to re-claim the lock when they wake up.
// This helps lowering the latency when starting and stopping the worker threads.
class SemaphoreGangTaskDispatcher : public GangTaskDispatcher {
// The task currently being dispatched to the GangWorkers.
AbstractGangTask* _task;
// Other initialization.
_monitor = new Monitor(/* priority */ Mutex::leaf,
/* name */ "WorkGroup monitor",
/* allow_vm_block */ are_GC_task_threads,
Monitor::_safepoint_check_sometimes);
volatile uint _started;
volatile uint _not_finished;
assert(monitor() != NULL, "Failed to allocate monitor");
// Semaphore used to start the GangWorkers.
Semaphore* _start_semaphore;
// Semaphore used to notify the coordinator that all workers are done.
Semaphore* _end_semaphore;
public:
SemaphoreGangTaskDispatcher() :
_task(NULL),
_started(0),
_not_finished(0),
_start_semaphore(new Semaphore()),
_end_semaphore(new Semaphore())
{ }
~SemaphoreGangTaskDispatcher() {
delete _start_semaphore;
delete _end_semaphore;
}
void coordinator_execute_on_workers(AbstractGangTask* task, uint num_workers) {
// No workers are allowed to read the state variables until they have been signaled.
_task = task;
_not_finished = num_workers;
// Dispatch 'num_workers' number of tasks.
_start_semaphore->signal(num_workers);
// Wait for the last worker to signal the coordinator.
_end_semaphore->wait();
// No workers are allowed to read the state variables after the coordinator has been signaled.
assert(_not_finished == 0, err_msg("%d not finished workers?", _not_finished));
_task = NULL;
_started = 0;
}
WorkData worker_wait_for_task() {
// Wait for the coordinator to dispatch a task.
_start_semaphore->wait();
uint num_started = (uint) Atomic::add(1, (volatile jint*)&_started);
// Subtract one to get a zero-indexed worker id.
uint worker_id = num_started - 1;
return WorkData(_task, worker_id);
}
void worker_done_with_task() {
// Mark that the worker is done with the task.
// The worker is not allowed to read the state variables after this line.
uint not_finished = (uint) Atomic::add(-1, (volatile jint*)&_not_finished);
// The last worker signals to the coordinator that all work is completed.
if (not_finished == 0) {
_end_semaphore->signal();
}
}
};
class MutexGangTaskDispatcher : public GangTaskDispatcher {
AbstractGangTask* _task;
volatile uint _started;
volatile uint _finished;
volatile uint _num_workers;
Monitor* _monitor;
public:
MutexGangTaskDispatcher()
: _task(NULL),
_monitor(new Monitor(Monitor::leaf, "WorkGang dispatcher lock", false, Monitor::_safepoint_check_never)),
_started(0),
_finished(0),
_num_workers(0) {}
~MutexGangTaskDispatcher() {
delete _monitor;
}
void coordinator_execute_on_workers(AbstractGangTask* task, uint num_workers) {
MutexLockerEx ml(_monitor, Mutex::_no_safepoint_check_flag);
_task = task;
_num_workers = num_workers;
// Tell the workers to get to work.
_monitor->notify_all();
// Wait for them to finish.
while (_finished < _num_workers) {
_monitor->wait(/* no_safepoint_check */ true);
}
_task = NULL;
_num_workers = 0;
_started = 0;
_finished = 0;
}
WorkData worker_wait_for_task() {
MonitorLockerEx ml(_monitor, Mutex::_no_safepoint_check_flag);
while (_num_workers == 0 || _started == _num_workers) {
_monitor->wait(/* no_safepoint_check */ true);
}
_started++;
// Subtract one to get a zero-indexed worker id.
uint worker_id = _started - 1;
return WorkData(_task, worker_id);
}
void worker_done_with_task() {
MonitorLockerEx ml(_monitor, Mutex::_no_safepoint_check_flag);
_finished++;
if (_finished == _num_workers) {
// This will wake up all workers and not only the coordinator.
_monitor->notify_all();
}
}
};
static GangTaskDispatcher* create_dispatcher() {
if (UseSemaphoreGCThreadsSynchronization) {
return new SemaphoreGangTaskDispatcher();
}
return new MutexGangTaskDispatcher();
}
WorkGang::WorkGang(const char* name,
uint workers,
bool are_GC_task_threads,
bool are_ConcurrentGC_threads) :
AbstractWorkGang(name, workers, are_GC_task_threads, are_ConcurrentGC_threads),
_dispatcher(create_dispatcher())
{ }
AbstractGangWorker* WorkGang::allocate_worker(uint worker_id) {
return new GangWorker(this, worker_id);
}
void WorkGang::run_task(AbstractGangTask* task) {
run_task(task, (uint)active_workers());
_dispatcher->coordinator_execute_on_workers(task, active_workers());
}
void WorkGang::run_task(AbstractGangTask* task, uint no_of_parallel_workers) {
// This thread is executed by the VM thread which does not block
// on ordinary MutexLocker's.
MutexLockerEx ml(monitor(), Mutex::_no_safepoint_check_flag);
if (TraceWorkGang) {
tty->print_cr("Running work gang %s task %s", name(), task->name());
}
// Tell all the workers to run a task.
assert(task != NULL, "Running a null task");
// Initialize.
_task = task;
_sequence_number += 1;
_started_workers = 0;
_finished_workers = 0;
// Tell the workers to get to work.
monitor()->notify_all();
// Wait for them to be finished
while (finished_workers() < no_of_parallel_workers) {
if (TraceWorkGang) {
tty->print_cr("Waiting in work gang %s: %u/%u finished sequence %d",
name(), finished_workers(), no_of_parallel_workers,
_sequence_number);
}
monitor()->wait(/* no_safepoint_check */ true);
}
_task = NULL;
if (TraceWorkGang) {
tty->print_cr("\nFinished work gang %s: %u/%u sequence %d",
name(), finished_workers(), no_of_parallel_workers,
_sequence_number);
Thread* me = Thread::current();
tty->print_cr(" T: " PTR_FORMAT " VM_thread: %d", p2i(me), me->is_VM_thread());
}
}
void WorkGang::internal_worker_poll(WorkData* data) const {
assert(monitor()->owned_by_self(), "worker_poll is an internal method");
assert(data != NULL, "worker data is null");
data->set_task(task());
data->set_sequence_number(sequence_number());
}
void WorkGang::internal_note_start() {
assert(monitor()->owned_by_self(), "note_finish is an internal method");
_started_workers += 1;
}
void WorkGang::internal_note_finish() {
assert(monitor()->owned_by_self(), "note_finish is an internal method");
_finished_workers += 1;
}
// GangWorker methods.
AbstractGangWorker::AbstractGangWorker(AbstractWorkGang* gang, uint id) {
_gang = gang;
set_id(id);
@ -218,79 +303,43 @@ void AbstractGangWorker::print_on(outputStream* st) const {
st->cr();
}
WorkData GangWorker::wait_for_task() {
return gang()->dispatcher()->worker_wait_for_task();
}
void GangWorker::signal_task_done() {
gang()->dispatcher()->worker_done_with_task();
}
void GangWorker::print_task_started(WorkData data) {
if (TraceWorkGang) {
tty->print_cr("Running work gang %s task %s worker %u", name(), data._task->name(), data._worker_id);
}
}
void GangWorker::print_task_done(WorkData data) {
if (TraceWorkGang) {
tty->print_cr("\nFinished work gang %s task %s worker %u", name(), data._task->name(), data._worker_id);
Thread* me = Thread::current();
tty->print_cr(" T: " PTR_FORMAT " VM_thread: %d", p2i(me), me->is_VM_thread());
}
}
void GangWorker::run_task(WorkData data) {
print_task_started(data);
data._task->work(data._worker_id);
print_task_done(data);
}
void GangWorker::loop() {
int previous_sequence_number = 0;
Monitor* gang_monitor = gang()->monitor();
for ( ; ; ) {
WorkData data;
int part; // Initialized below.
{
// Grab the gang mutex.
MutexLocker ml(gang_monitor);
// Wait for something to do.
// Polling outside the while { wait } avoids missed notifies
// in the outer loop.
gang()->internal_worker_poll(&data);
if (TraceWorkGang) {
tty->print("Polled outside for work in gang %s worker %u",
gang()->name(), id());
tty->print(" sequence: %d (prev: %d)",
data.sequence_number(), previous_sequence_number);
if (data.task() != NULL) {
tty->print(" task: %s", data.task()->name());
} else {
tty->print(" task: NULL");
}
tty->cr();
}
for ( ; /* break */; ) {
// Check for new work.
if ((data.task() != NULL) &&
(data.sequence_number() != previous_sequence_number)) {
if (gang()->needs_more_workers()) {
gang()->internal_note_start();
gang_monitor->notify_all();
part = gang()->started_workers() - 1;
break;
}
}
// Nothing to do.
gang_monitor->wait(/* no_safepoint_check */ true);
gang()->internal_worker_poll(&data);
if (TraceWorkGang) {
tty->print("Polled inside for work in gang %s worker %u",
gang()->name(), id());
tty->print(" sequence: %d (prev: %d)",
data.sequence_number(), previous_sequence_number);
if (data.task() != NULL) {
tty->print(" task: %s", data.task()->name());
} else {
tty->print(" task: NULL");
}
tty->cr();
}
}
// Drop gang mutex.
}
if (TraceWorkGang) {
tty->print("Work for work gang %s id %u task %s part %d",
gang()->name(), id(), data.task()->name(), part);
}
assert(data.task() != NULL, "Got null task");
data.task()->work(part);
{
if (TraceWorkGang) {
tty->print("Finish for work gang %s id %u task %s part %d",
gang()->name(), id(), data.task()->name(), part);
}
// Grab the gang mutex.
MutexLocker ml(gang_monitor);
gang()->internal_note_finish();
// Tell the gang you are done.
gang_monitor->notify_all();
// Drop the gang mutex.
}
previous_sequence_number = data.sequence_number();
while (true) {
WorkData data = wait_for_task();
run_task(data);
signal_task_done();
}
}