src: only block on user blocking worker tasks

we should not be blocking on the worker tasks on the
main thread in one go. Doing so leads to two problems:

1. If any of the worker tasks post another foreground task and wait
   for it to complete, and that foreground task is posted right after
   we flush the foreground task queue and before the foreground thread
   goes into sleep, we'll never be able to wake up to execute that
   foreground task and in turn the worker task will never complete, and
   we have a deadlock.
2. Worker tasks can be posted from any thread, not necessarily
   associated with the current isolate, and we can be blocking on a
   worker task that is associated with a completely unrelated isolate
   in the event loop. This is suboptimal.

However, not blocking on the worker tasks at all can lead to loss of
some critical user-blocking worker tasks e.g. wasm async compilation
tasks, which should block the main thread until they are completed,
as the documentation suggets. As a compromise, we currently only block
on user-blocking tasks to reduce the chance of deadlocks while making
sure that criticl user-blocking tasks are not lost.

PR-URL: https://github.com/nodejs/node/pull/58047
Refs: https://github.com/nodejs/node/pull/47452
Refs: https://github.com/nodejs/node/issues/54918
Reviewed-By: Stephen Belanger <admin@stephenbelanger.com>
This commit is contained in:
Joyee Cheung 2025-05-02 19:31:38 +02:00 committed by Node.js GitHub Bot
parent fa3c0e00d1
commit 5fb879c458
2 changed files with 48 additions and 15 deletions

View file

@ -78,7 +78,10 @@ static void PlatformWorkerThread(void* data) {
fflush(stderr);
}
entry->task->Run();
pending_worker_tasks->Lock().NotifyOfCompletion();
// See NodePlatform::DrainTasks().
if (entry->is_outstanding()) {
pending_worker_tasks->Lock().NotifyOfOutstandingCompletion();
}
}
}
@ -209,8 +212,10 @@ class WorkerThreadsTaskRunner::DelayedTaskScheduler {
static void RunTask(uv_timer_t* timer) {
DelayedTaskScheduler* scheduler =
ContainerOf(&DelayedTaskScheduler::loop_, timer->loop);
scheduler->pending_worker_tasks_->Lock().Push(
scheduler->TakeTimerTask(timer));
auto entry = scheduler->TakeTimerTask(timer);
bool is_outstanding = entry->is_outstanding();
scheduler->pending_worker_tasks_->Lock().Push(std::move(entry),
is_outstanding);
}
std::unique_ptr<TaskQueueEntry> TakeTimerTask(uv_timer_t* timer) {
@ -277,7 +282,8 @@ void WorkerThreadsTaskRunner::PostTask(v8::TaskPriority priority,
std::unique_ptr<v8::Task> task,
const v8::SourceLocation& location) {
auto entry = std::make_unique<TaskQueueEntry>(std::move(task), priority);
pending_worker_tasks_.Lock().Push(std::move(entry));
bool is_outstanding = entry->is_outstanding();
pending_worker_tasks_.Lock().Push(std::move(entry), is_outstanding);
}
void WorkerThreadsTaskRunner::PostDelayedTask(
@ -574,7 +580,25 @@ void NodePlatform::DrainTasks(Isolate* isolate) {
if (!per_isolate) return;
do {
// Worker tasks aren't associated with an Isolate.
// FIXME(54918): we should not be blocking on the worker tasks on the
// main thread in one go. Doing so leads to two problems:
// 1. If any of the worker tasks post another foreground task and wait
// for it to complete, and that foreground task is posted right after
// we flush the foreground task queue and before the foreground thread
// goes into sleep, we'll never be able to wake up to execute that
// foreground task and in turn the worker task will never complete, and
// we have a deadlock.
// 2. Worker tasks can be posted from any thread, not necessarily associated
// with the current isolate, and we can be blocking on a worker task that
// is associated with a completely unrelated isolate in the event loop.
// This is suboptimal.
//
// However, not blocking on the worker tasks at all can lead to loss of some
// critical user-blocking worker tasks e.g. wasm async compilation tasks,
// which should block the main thread until they are completed, as the
// documentation suggets. As a compromise, we currently only block on
// user-blocking tasks to reduce the chance of deadlocks while making sure
// that criticl user-blocking tasks are not lost.
worker_thread_task_runner_->BlockingDrain();
} while (per_isolate->FlushForegroundTasksInternal());
}
@ -748,16 +772,22 @@ v8::PageAllocator* NodePlatform::GetPageAllocator() {
template <class T>
TaskQueue<T>::TaskQueue()
: lock_(), tasks_available_(), tasks_drained_(),
outstanding_tasks_(0), stopped_(false), task_queue_() { }
: lock_(),
tasks_available_(),
outstanding_tasks_drained_(),
outstanding_tasks_(0),
stopped_(false),
task_queue_() {}
template <class T>
TaskQueue<T>::Locked::Locked(TaskQueue* queue)
: queue_(queue), lock_(queue->lock_) {}
template <class T>
void TaskQueue<T>::Locked::Push(std::unique_ptr<T> task) {
queue_->outstanding_tasks_++;
void TaskQueue<T>::Locked::Push(std::unique_ptr<T> task, bool outstanding) {
if (outstanding) {
queue_->outstanding_tasks_++;
}
queue_->task_queue_.push(std::move(task));
queue_->tasks_available_.Signal(lock_);
}
@ -788,16 +818,16 @@ std::unique_ptr<T> TaskQueue<T>::Locked::BlockingPop() {
}
template <class T>
void TaskQueue<T>::Locked::NotifyOfCompletion() {
void TaskQueue<T>::Locked::NotifyOfOutstandingCompletion() {
if (--queue_->outstanding_tasks_ == 0) {
queue_->tasks_drained_.Broadcast(lock_);
queue_->outstanding_tasks_drained_.Broadcast(lock_);
}
}
template <class T>
void TaskQueue<T>::Locked::BlockingDrain() {
while (queue_->outstanding_tasks_ > 0) {
queue_->tasks_drained_.Wait(lock_);
queue_->outstanding_tasks_drained_.Wait(lock_);
}
}

View file

@ -48,10 +48,10 @@ class TaskQueue {
EntryCompare>;
class Locked {
public:
void Push(std::unique_ptr<T> task);
void Push(std::unique_ptr<T> task, bool outstanding = false);
std::unique_ptr<T> Pop();
std::unique_ptr<T> BlockingPop();
void NotifyOfCompletion();
void NotifyOfOutstandingCompletion();
void BlockingDrain();
void Stop();
PriorityQueue PopAll();
@ -72,7 +72,7 @@ class TaskQueue {
private:
Mutex lock_;
ConditionVariable tasks_available_;
ConditionVariable tasks_drained_;
ConditionVariable outstanding_tasks_drained_;
int outstanding_tasks_;
bool stopped_;
PriorityQueue task_queue_;
@ -83,6 +83,9 @@ struct TaskQueueEntry {
v8::TaskPriority priority;
TaskQueueEntry(std::unique_ptr<v8::Task> t, v8::TaskPriority p)
: task(std::move(t)), priority(p) {}
inline bool is_outstanding() const {
return priority == v8::TaskPriority::kUserBlocking;
}
};
struct DelayedTask {