mirror of
https://github.com/ruby/ruby.git
synced 2025-08-15 05:29:10 +02:00
Fix blocking operation cancellation. (#13614)
Expose `rb_thread_resolve_unblock_function` internally.
This commit is contained in:
parent
c45c600e22
commit
68625a23d6
Notes:
git
2025-06-14 03:33:04 +00:00
Merged-By: ioquatix <samuel@codeotaku.com>
3 changed files with 49 additions and 13 deletions
|
@ -83,6 +83,8 @@ RUBY_SYMBOL_EXPORT_END
|
|||
int rb_threadptr_execute_interrupts(struct rb_thread_struct *th, int blocking_timing);
|
||||
bool rb_thread_mn_schedulable(VALUE thread);
|
||||
|
||||
bool rb_thread_resolve_unblock_function(rb_unblock_function_t **unblock_function, void **data2, struct rb_thread_struct *thread);
|
||||
|
||||
// interrupt exec
|
||||
|
||||
typedef VALUE (rb_interrupt_exec_func_t)(void *data);
|
||||
|
|
29
scheduler.c
29
scheduler.c
|
@ -63,8 +63,10 @@ typedef enum {
|
|||
struct rb_fiber_scheduler_blocking_operation {
|
||||
void *(*function)(void *);
|
||||
void *data;
|
||||
|
||||
rb_unblock_function_t *unblock_function;
|
||||
void *data2;
|
||||
|
||||
int flags;
|
||||
struct rb_fiber_scheduler_blocking_operation_state *state;
|
||||
|
||||
|
@ -208,7 +210,10 @@ rb_fiber_scheduler_blocking_operation_execute(rb_fiber_scheduler_blocking_operat
|
|||
return -1; // Invalid blocking operation
|
||||
}
|
||||
|
||||
// Atomically check if we can transition from QUEUED to EXECUTING
|
||||
// Resolve sentinel values for unblock_function and data2:
|
||||
rb_thread_resolve_unblock_function(&blocking_operation->unblock_function, &blocking_operation->data2, GET_THREAD());
|
||||
|
||||
// Atomically check if we can transition from QUEUED to EXECUTING
|
||||
rb_atomic_t expected = RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED;
|
||||
if (RUBY_ATOMIC_CAS(blocking_operation->status, expected, RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_EXECUTING) != expected) {
|
||||
// Already cancelled or in wrong state
|
||||
|
@ -1124,25 +1129,33 @@ rb_fiber_scheduler_blocking_operation_cancel(rb_fiber_scheduler_blocking_operati
|
|||
|
||||
rb_atomic_t current_state = RUBY_ATOMIC_LOAD(blocking_operation->status);
|
||||
|
||||
switch (current_state) {
|
||||
switch (current_state) {
|
||||
case RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED:
|
||||
// Work hasn't started - just mark as cancelled
|
||||
// Work hasn't started - just mark as cancelled:
|
||||
if (RUBY_ATOMIC_CAS(blocking_operation->status, current_state, RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_CANCELLED) == current_state) {
|
||||
return 0; // Successfully cancelled before execution
|
||||
// Successfully cancelled before execution:
|
||||
return 0;
|
||||
}
|
||||
// Fall through if state changed between load and CAS
|
||||
|
||||
case RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_EXECUTING:
|
||||
// Work is running - mark cancelled AND call unblock function
|
||||
RUBY_ATOMIC_SET(blocking_operation->status, RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_CANCELLED);
|
||||
if (blocking_operation->unblock_function) {
|
||||
if (RUBY_ATOMIC_CAS(blocking_operation->status, current_state, RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_CANCELLED) != current_state) {
|
||||
// State changed between load and CAS - operation may have completed:
|
||||
return 0;
|
||||
}
|
||||
// Otherwise, we successfully marked it as cancelled, so we can call the unblock function:
|
||||
rb_unblock_function_t *unblock_function = blocking_operation->unblock_function;
|
||||
if (unblock_function) {
|
||||
RUBY_ASSERT(unblock_function != (rb_unblock_function_t *)-1 && "unblock_function is still sentinel value -1, should have been resolved earlier");
|
||||
blocking_operation->unblock_function(blocking_operation->data2);
|
||||
}
|
||||
return 1; // Cancelled during execution (unblock function called)
|
||||
// Cancelled during execution (unblock function called):
|
||||
return 1;
|
||||
|
||||
case RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_COMPLETED:
|
||||
case RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_CANCELLED:
|
||||
// Already finished or cancelled
|
||||
// Already finished or cancelled:
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
31
thread.c
31
thread.c
|
@ -1540,6 +1540,29 @@ blocking_region_end(rb_thread_t *th, struct rb_blocking_region_buffer *region)
|
|||
#endif
|
||||
}
|
||||
|
||||
/*
|
||||
* Resolve sentinel unblock function values to their actual function pointers
|
||||
* and appropriate data2 values. This centralizes the logic for handling
|
||||
* RUBY_UBF_IO and RUBY_UBF_PROCESS sentinel values.
|
||||
*
|
||||
* @param unblock_function Pointer to unblock function pointer (modified in place)
|
||||
* @param data2 Pointer to data2 pointer (modified in place)
|
||||
* @param thread Thread context for resolving data2 when needed
|
||||
* @return true if sentinel values were resolved, false otherwise
|
||||
*/
|
||||
bool
|
||||
rb_thread_resolve_unblock_function(rb_unblock_function_t **unblock_function, void **data2, struct rb_thread_struct *thread)
|
||||
{
|
||||
rb_unblock_function_t *ubf = *unblock_function;
|
||||
|
||||
if ((ubf == RUBY_UBF_IO) || (ubf == RUBY_UBF_PROCESS)) {
|
||||
*unblock_function = ubf_select;
|
||||
*data2 = thread;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
void *
|
||||
rb_nogvl(void *(*func)(void *), void *data1,
|
||||
rb_unblock_function_t *ubf, void *data2,
|
||||
|
@ -1566,11 +1589,9 @@ rb_nogvl(void *(*func)(void *), void *data1,
|
|||
bool is_main_thread = vm->ractor.main_thread == th;
|
||||
int saved_errno = 0;
|
||||
|
||||
if ((ubf == RUBY_UBF_IO) || (ubf == RUBY_UBF_PROCESS)) {
|
||||
ubf = ubf_select;
|
||||
data2 = th;
|
||||
}
|
||||
else if (ubf && rb_ractor_living_thread_num(th->ractor) == 1 && is_main_thread) {
|
||||
rb_thread_resolve_unblock_function(&ubf, &data2, th);
|
||||
|
||||
if (ubf && rb_ractor_living_thread_num(th->ractor) == 1 && is_main_thread) {
|
||||
if (flags & RB_NOGVL_UBF_ASYNC_SAFE) {
|
||||
vm->ubf_async_safe = 1;
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue