mirror of
https://github.com/openjdk/jdk.git
synced 2025-08-27 23:04:50 +02:00
7127792: Add the ability to change an existing PeriodicTask's execution interval
Enables dynamic enrollment / disenrollment from the PeriodicTasks in WatcherThread. Reviewed-by: dholmes, mgronlun
This commit is contained in:
parent
61a5a58cb1
commit
e1d995ab86
6 changed files with 162 additions and 92 deletions
|
@ -140,6 +140,7 @@ Monitor* JfrQuery_lock = NULL;
|
|||
Monitor* JfrMsg_lock = NULL;
|
||||
Mutex* JfrBuffer_lock = NULL;
|
||||
Mutex* JfrStream_lock = NULL;
|
||||
Monitor* PeriodicTask_lock = NULL;
|
||||
|
||||
#define MAX_NUM_MUTEX 128
|
||||
static Monitor * _mutex_array[MAX_NUM_MUTEX];
|
||||
|
@ -285,6 +286,7 @@ void mutex_init() {
|
|||
def(JfrMsg_lock , Monitor, nonleaf+2, true);
|
||||
def(JfrBuffer_lock , Mutex, nonleaf+3, true);
|
||||
def(JfrStream_lock , Mutex, nonleaf+4, true);
|
||||
def(PeriodicTask_lock , Monitor, nonleaf+5, true);
|
||||
}
|
||||
|
||||
GCMutexLocker::GCMutexLocker(Monitor * mutex) {
|
||||
|
|
|
@ -142,6 +142,7 @@ extern Monitor* JfrQuery_lock; // protects JFR use
|
|||
extern Monitor* JfrMsg_lock; // protects JFR messaging
|
||||
extern Mutex* JfrBuffer_lock; // protects JFR buffer operations
|
||||
extern Mutex* JfrStream_lock; // protects JFR stream access
|
||||
extern Monitor* PeriodicTask_lock; // protects the periodic task structure
|
||||
|
||||
// A MutexLocker provides mutual exclusion with respect to a given mutex
|
||||
// for the scope which contains the locker. The lock is an OS lock, not
|
||||
|
|
|
@ -61,7 +61,7 @@ void PeriodicTask::print_intervals() {
|
|||
}
|
||||
#endif
|
||||
|
||||
void PeriodicTask::real_time_tick(size_t delay_time) {
|
||||
void PeriodicTask::real_time_tick(int delay_time) {
|
||||
#ifndef PRODUCT
|
||||
if (ProfilerCheckIntervals) {
|
||||
_ticks++;
|
||||
|
@ -73,7 +73,11 @@ void PeriodicTask::real_time_tick(size_t delay_time) {
|
|||
_intervalHistogram[ms]++;
|
||||
}
|
||||
#endif
|
||||
|
||||
{
|
||||
MutexLockerEx ml(PeriodicTask_lock, Mutex::_no_safepoint_check_flag);
|
||||
int orig_num_tasks = _num_tasks;
|
||||
|
||||
for(int index = 0; index < _num_tasks; index++) {
|
||||
_tasks[index]->execute_if_pending(delay_time);
|
||||
if (_num_tasks < orig_num_tasks) { // task dis-enrolled itself
|
||||
|
@ -81,11 +85,27 @@ void PeriodicTask::real_time_tick(size_t delay_time) {
|
|||
orig_num_tasks = _num_tasks;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int PeriodicTask::time_to_wait() {
|
||||
MutexLockerEx ml(PeriodicTask_lock->owned_by_self() ?
|
||||
NULL : PeriodicTask_lock, Mutex::_no_safepoint_check_flag);
|
||||
|
||||
if (_num_tasks == 0) {
|
||||
return 0; // sleep until shutdown or a task is enrolled
|
||||
}
|
||||
|
||||
int delay = _tasks[0]->time_to_next_interval();
|
||||
for (int index = 1; index < _num_tasks; index++) {
|
||||
delay = MIN2(delay, _tasks[index]->time_to_next_interval());
|
||||
}
|
||||
return delay;
|
||||
}
|
||||
|
||||
|
||||
PeriodicTask::PeriodicTask(size_t interval_time) :
|
||||
_counter(0), _interval(interval_time) {
|
||||
_counter(0), _interval((int) interval_time) {
|
||||
// Sanity check the interval time
|
||||
assert(_interval >= PeriodicTask::min_interval &&
|
||||
_interval <= PeriodicTask::max_interval &&
|
||||
|
@ -94,33 +114,40 @@ PeriodicTask::PeriodicTask(size_t interval_time) :
|
|||
}
|
||||
|
||||
PeriodicTask::~PeriodicTask() {
|
||||
if (is_enrolled())
|
||||
disenroll();
|
||||
}
|
||||
|
||||
bool PeriodicTask::is_enrolled() const {
|
||||
for(int index = 0; index < _num_tasks; index++)
|
||||
if (_tasks[index] == this) return true;
|
||||
return false;
|
||||
}
|
||||
|
||||
void PeriodicTask::enroll() {
|
||||
assert(WatcherThread::watcher_thread() == NULL, "dynamic enrollment of tasks not yet supported");
|
||||
MutexLockerEx ml(PeriodicTask_lock->owned_by_self() ?
|
||||
NULL : PeriodicTask_lock, Mutex::_no_safepoint_check_flag);
|
||||
|
||||
if (_num_tasks == PeriodicTask::max_tasks)
|
||||
if (_num_tasks == PeriodicTask::max_tasks) {
|
||||
fatal("Overflow in PeriodicTask table");
|
||||
}
|
||||
_tasks[_num_tasks++] = this;
|
||||
|
||||
WatcherThread* thread = WatcherThread::watcher_thread();
|
||||
if (thread) {
|
||||
thread->unpark();
|
||||
} else {
|
||||
WatcherThread::start();
|
||||
}
|
||||
}
|
||||
|
||||
void PeriodicTask::disenroll() {
|
||||
assert(WatcherThread::watcher_thread() == NULL ||
|
||||
Thread::current() == WatcherThread::watcher_thread(),
|
||||
"dynamic disenrollment currently only handled from WatcherThread from within task() method");
|
||||
MutexLockerEx ml(PeriodicTask_lock->owned_by_self() ?
|
||||
NULL : PeriodicTask_lock, Mutex::_no_safepoint_check_flag);
|
||||
|
||||
int index;
|
||||
for(index = 0; index < _num_tasks && _tasks[index] != this; index++);
|
||||
if (index == _num_tasks) return;
|
||||
for(index = 0; index < _num_tasks && _tasks[index] != this; index++)
|
||||
;
|
||||
|
||||
if (index == _num_tasks) {
|
||||
return;
|
||||
}
|
||||
|
||||
_num_tasks--;
|
||||
|
||||
for (; index < _num_tasks; index++) {
|
||||
_tasks[index] = _tasks[index+1];
|
||||
}
|
||||
|
|
|
@ -49,12 +49,12 @@ class PeriodicTask: public CHeapObj<mtInternal> {
|
|||
static int num_tasks() { return _num_tasks; }
|
||||
|
||||
private:
|
||||
size_t _counter;
|
||||
const size_t _interval;
|
||||
int _counter;
|
||||
const int _interval;
|
||||
|
||||
static int _num_tasks;
|
||||
static PeriodicTask* _tasks[PeriodicTask::max_tasks];
|
||||
static void real_time_tick(size_t delay_time);
|
||||
static void real_time_tick(int delay_time);
|
||||
|
||||
#ifndef PRODUCT
|
||||
static elapsedTimer _timer; // measures time between ticks
|
||||
|
@ -69,51 +69,36 @@ class PeriodicTask: public CHeapObj<mtInternal> {
|
|||
PeriodicTask(size_t interval_time); // interval is in milliseconds of elapsed time
|
||||
~PeriodicTask();
|
||||
|
||||
// Tells whether is enrolled
|
||||
bool is_enrolled() const;
|
||||
|
||||
// Make the task active
|
||||
// NOTE: this may only be called before the WatcherThread has been started
|
||||
// For dynamic enrollment at the time T, the task will execute somewhere
|
||||
// between T and T + interval_time.
|
||||
void enroll();
|
||||
|
||||
// Make the task deactive
|
||||
// NOTE: this may only be called either while the WatcherThread is
|
||||
// inactive or by a task from within its task() method. One-shot or
|
||||
// several-shot tasks may be implemented this way.
|
||||
void disenroll();
|
||||
|
||||
void execute_if_pending(size_t delay_time) {
|
||||
_counter += delay_time;
|
||||
if (_counter >= _interval) {
|
||||
void execute_if_pending(int delay_time) {
|
||||
// make sure we don't overflow
|
||||
jlong tmp = (jlong) _counter + (jlong) delay_time;
|
||||
|
||||
if (tmp >= (jlong) _interval) {
|
||||
_counter = 0;
|
||||
task();
|
||||
} else {
|
||||
_counter += delay_time;
|
||||
}
|
||||
}
|
||||
|
||||
// Returns how long (time in milliseconds) before the next time we should
|
||||
// execute this task.
|
||||
size_t time_to_next_interval() const {
|
||||
int time_to_next_interval() const {
|
||||
assert(_interval > _counter, "task counter greater than interval?");
|
||||
return _interval - _counter;
|
||||
}
|
||||
|
||||
// Calculate when the next periodic task will fire.
|
||||
// Called by the WatcherThread's run method.
|
||||
// This assumes that periodic tasks aren't entering the system
|
||||
// dynamically, except for during startup.
|
||||
static size_t time_to_wait() {
|
||||
if (_num_tasks == 0) {
|
||||
// Don't wait any more; shut down the thread since we don't
|
||||
// currently support dynamic enrollment.
|
||||
return 0;
|
||||
}
|
||||
|
||||
size_t delay = _tasks[0]->time_to_next_interval();
|
||||
for (int index = 1; index < _num_tasks; index++) {
|
||||
delay = MIN2(delay, _tasks[index]->time_to_next_interval());
|
||||
}
|
||||
return delay;
|
||||
}
|
||||
static int time_to_wait();
|
||||
|
||||
// The task to perform at each period
|
||||
virtual void task() = 0;
|
||||
|
|
|
@ -1217,6 +1217,7 @@ void NamedThread::set_name(const char* format, ...) {
|
|||
// timer interrupts exists on the platform.
|
||||
|
||||
WatcherThread* WatcherThread::_watcher_thread = NULL;
|
||||
bool WatcherThread::_startable = false;
|
||||
volatile bool WatcherThread::_should_terminate = false;
|
||||
|
||||
WatcherThread::WatcherThread() : Thread() {
|
||||
|
@ -1237,6 +1238,55 @@ WatcherThread::WatcherThread() : Thread() {
|
|||
}
|
||||
}
|
||||
|
||||
int WatcherThread::sleep() const {
|
||||
MutexLockerEx ml(PeriodicTask_lock, Mutex::_no_safepoint_check_flag);
|
||||
|
||||
// remaining will be zero if there are no tasks,
|
||||
// causing the WatcherThread to sleep until a task is
|
||||
// enrolled
|
||||
int remaining = PeriodicTask::time_to_wait();
|
||||
int time_slept = 0;
|
||||
|
||||
// we expect this to timeout - we only ever get unparked when
|
||||
// we should terminate or when a new task has been enrolled
|
||||
OSThreadWaitState osts(this->osthread(), false /* not Object.wait() */);
|
||||
|
||||
jlong time_before_loop = os::javaTimeNanos();
|
||||
|
||||
for (;;) {
|
||||
bool timedout = PeriodicTask_lock->wait(Mutex::_no_safepoint_check_flag, remaining);
|
||||
jlong now = os::javaTimeNanos();
|
||||
|
||||
if (remaining == 0) {
|
||||
// if we didn't have any tasks we could have waited for a long time
|
||||
// consider the time_slept zero and reset time_before_loop
|
||||
time_slept = 0;
|
||||
time_before_loop = now;
|
||||
} else {
|
||||
// need to recalulate since we might have new tasks in _tasks
|
||||
time_slept = (int) ((now - time_before_loop) / 1000000);
|
||||
}
|
||||
|
||||
// Change to task list or spurious wakeup of some kind
|
||||
if (timedout || _should_terminate) {
|
||||
break;
|
||||
}
|
||||
|
||||
remaining = PeriodicTask::time_to_wait();
|
||||
if (remaining == 0) {
|
||||
// Last task was just disenrolled so loop around and wait until
|
||||
// another task gets enrolled
|
||||
continue;
|
||||
}
|
||||
|
||||
remaining -= time_slept;
|
||||
if (remaining <= 0)
|
||||
break;
|
||||
}
|
||||
|
||||
return time_slept;
|
||||
}
|
||||
|
||||
void WatcherThread::run() {
|
||||
assert(this == watcher_thread(), "just checking");
|
||||
|
||||
|
@ -1249,26 +1299,7 @@ void WatcherThread::run() {
|
|||
|
||||
// Calculate how long it'll be until the next PeriodicTask work
|
||||
// should be done, and sleep that amount of time.
|
||||
size_t time_to_wait = PeriodicTask::time_to_wait();
|
||||
|
||||
// we expect this to timeout - we only ever get unparked when
|
||||
// we should terminate
|
||||
{
|
||||
OSThreadWaitState osts(this->osthread(), false /* not Object.wait() */);
|
||||
|
||||
jlong prev_time = os::javaTimeNanos();
|
||||
for (;;) {
|
||||
int res= _SleepEvent->park(time_to_wait);
|
||||
if (res == OS_TIMEOUT || _should_terminate)
|
||||
break;
|
||||
// spurious wakeup of some kind
|
||||
jlong now = os::javaTimeNanos();
|
||||
time_to_wait -= (now - prev_time) / 1000000;
|
||||
if (time_to_wait <= 0)
|
||||
break;
|
||||
prev_time = now;
|
||||
}
|
||||
}
|
||||
int time_waited = sleep();
|
||||
|
||||
if (is_error_reported()) {
|
||||
// A fatal error has happened, the error handler(VMError::report_and_die)
|
||||
|
@ -1298,13 +1329,7 @@ void WatcherThread::run() {
|
|||
}
|
||||
}
|
||||
|
||||
PeriodicTask::real_time_tick(time_to_wait);
|
||||
|
||||
// If we have no more tasks left due to dynamic disenrollment,
|
||||
// shut down the thread since we don't currently support dynamic enrollment
|
||||
if (PeriodicTask::num_tasks() == 0) {
|
||||
_should_terminate = true;
|
||||
}
|
||||
PeriodicTask::real_time_tick(time_waited);
|
||||
}
|
||||
|
||||
// Signal that it is terminated
|
||||
|
@ -1319,22 +1344,33 @@ void WatcherThread::run() {
|
|||
}
|
||||
|
||||
void WatcherThread::start() {
|
||||
if (watcher_thread() == NULL) {
|
||||
assert(PeriodicTask_lock->owned_by_self(), "PeriodicTask_lock required");
|
||||
|
||||
if (watcher_thread() == NULL && _startable) {
|
||||
_should_terminate = false;
|
||||
// Create the single instance of WatcherThread
|
||||
new WatcherThread();
|
||||
}
|
||||
}
|
||||
|
||||
void WatcherThread::make_startable() {
|
||||
assert(PeriodicTask_lock->owned_by_self(), "PeriodicTask_lock required");
|
||||
_startable = true;
|
||||
}
|
||||
|
||||
void WatcherThread::stop() {
|
||||
// it is ok to take late safepoints here, if needed
|
||||
MutexLocker mu(Terminator_lock);
|
||||
{
|
||||
MutexLockerEx ml(PeriodicTask_lock, Mutex::_no_safepoint_check_flag);
|
||||
_should_terminate = true;
|
||||
OrderAccess::fence(); // ensure WatcherThread sees update in main loop
|
||||
|
||||
Thread* watcher = watcher_thread();
|
||||
WatcherThread* watcher = watcher_thread();
|
||||
if (watcher != NULL)
|
||||
watcher->_SleepEvent->unpark();
|
||||
watcher->unpark();
|
||||
}
|
||||
|
||||
// it is ok to take late safepoints here, if needed
|
||||
MutexLocker mu(Terminator_lock);
|
||||
|
||||
while(watcher_thread() != NULL) {
|
||||
// This wait should make safepoint checks, wait without a timeout,
|
||||
|
@ -1352,6 +1388,11 @@ void WatcherThread::stop() {
|
|||
}
|
||||
}
|
||||
|
||||
void WatcherThread::unpark() {
|
||||
MutexLockerEx ml(PeriodicTask_lock->owned_by_self() ? NULL : PeriodicTask_lock, Mutex::_no_safepoint_check_flag);
|
||||
PeriodicTask_lock->notify();
|
||||
}
|
||||
|
||||
void WatcherThread::print_on(outputStream* st) const {
|
||||
st->print("\"%s\" ", name());
|
||||
Thread::print_on(st);
|
||||
|
@ -3658,6 +3699,11 @@ jint Threads::create_vm(JavaVMInitArgs* args, bool* canTryAgain) {
|
|||
}
|
||||
}
|
||||
|
||||
{
|
||||
MutexLockerEx ml(PeriodicTask_lock, Mutex::_no_safepoint_check_flag);
|
||||
// Make sure the watcher thread can be started by WatcherThread::start()
|
||||
// or by dynamic enrollment.
|
||||
WatcherThread::make_startable();
|
||||
// Start up the WatcherThread if there are any periodic tasks
|
||||
// NOTE: All PeriodicTasks should be registered by now. If they
|
||||
// aren't, late joiners might appear to start slowly (we might
|
||||
|
@ -3665,6 +3711,7 @@ jint Threads::create_vm(JavaVMInitArgs* args, bool* canTryAgain) {
|
|||
if (PeriodicTask::num_tasks() > 0) {
|
||||
WatcherThread::start();
|
||||
}
|
||||
}
|
||||
|
||||
// Give os specific code one last chance to start
|
||||
os::init_3();
|
||||
|
|
|
@ -722,6 +722,7 @@ class WatcherThread: public Thread {
|
|||
private:
|
||||
static WatcherThread* _watcher_thread;
|
||||
|
||||
static bool _startable;
|
||||
volatile static bool _should_terminate; // updated without holding lock
|
||||
public:
|
||||
enum SomeConstants {
|
||||
|
@ -738,6 +739,7 @@ class WatcherThread: public Thread {
|
|||
char* name() const { return (char*)"VM Periodic Task Thread"; }
|
||||
void print_on(outputStream* st) const;
|
||||
void print() const { print_on(tty); }
|
||||
void unpark();
|
||||
|
||||
// Returns the single instance of WatcherThread
|
||||
static WatcherThread* watcher_thread() { return _watcher_thread; }
|
||||
|
@ -745,6 +747,12 @@ class WatcherThread: public Thread {
|
|||
// Create and start the single instance of WatcherThread, or stop it on shutdown
|
||||
static void start();
|
||||
static void stop();
|
||||
// Only allow start once the VM is sufficiently initialized
|
||||
// Otherwise the first task to enroll will trigger the start
|
||||
static void make_startable();
|
||||
|
||||
private:
|
||||
int sleep() const;
|
||||
};
|
||||
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue