mirror of
https://github.com/ruby/ruby.git
synced 2025-08-15 13:39:04 +02:00
Implement Queue#pop(timeout: sec)
[Feature #18774] As well as `SizedQueue#pop(timeout: sec)` If both `non_block=true` and `timeout:` are supplied, ArgumentError is raised.
This commit is contained in:
parent
ec3f59309e
commit
e3aabe93aa
Notes:
git
2022-08-02 18:04:59 +09:00
10 changed files with 238 additions and 75 deletions
105
thread_sync.c
105
thread_sync.c
|
@ -1,5 +1,6 @@
|
|||
/* included by thread.c */
|
||||
#include "ccan/list/list.h"
|
||||
#include "builtin.h"
|
||||
|
||||
static VALUE rb_cMutex, rb_cQueue, rb_cSizedQueue, rb_cConditionVariable;
|
||||
static VALUE rb_eClosedQueueError;
|
||||
|
@ -19,6 +20,12 @@ struct sync_waiter {
|
|||
struct ccan_list_node node;
|
||||
};
|
||||
|
||||
struct queue_sleep_arg {
|
||||
VALUE self;
|
||||
VALUE timeout;
|
||||
rb_hrtime_t end;
|
||||
};
|
||||
|
||||
#define MUTEX_ALLOW_TRAP FL_USER1
|
||||
|
||||
static void
|
||||
|
@ -514,7 +521,7 @@ rb_mutex_abandon_all(rb_mutex_t *mutexes)
|
|||
static VALUE
|
||||
rb_mutex_sleep_forever(VALUE self)
|
||||
{
|
||||
rb_thread_sleep_deadly_allow_spurious_wakeup(self);
|
||||
rb_thread_sleep_deadly_allow_spurious_wakeup(self, Qnil, 0);
|
||||
return Qnil;
|
||||
}
|
||||
|
||||
|
@ -706,6 +713,21 @@ queue_ptr(VALUE obj)
|
|||
|
||||
#define QUEUE_CLOSED FL_USER5
|
||||
|
||||
static rb_hrtime_t
|
||||
queue_timeout2hrtime(VALUE timeout) {
|
||||
if (NIL_P(timeout)) {
|
||||
return (rb_hrtime_t)0;
|
||||
}
|
||||
rb_hrtime_t rel = 0;
|
||||
if (FIXNUM_P(timeout)) {
|
||||
rel = rb_sec2hrtime(NUM2TIMET(timeout));
|
||||
}
|
||||
else {
|
||||
double2hrtime(&rel, rb_num2dbl(timeout));
|
||||
}
|
||||
return rb_hrtime_add(rel, rb_hrtime_now());
|
||||
}
|
||||
|
||||
static void
|
||||
szqueue_mark(void *ptr)
|
||||
{
|
||||
|
@ -964,9 +986,10 @@ rb_queue_push(VALUE self, VALUE obj)
|
|||
}
|
||||
|
||||
static VALUE
|
||||
queue_sleep(VALUE self)
|
||||
queue_sleep(VALUE _args)
|
||||
{
|
||||
rb_thread_sleep_deadly_allow_spurious_wakeup(self);
|
||||
struct queue_sleep_arg *args = (struct queue_sleep_arg *)_args;
|
||||
rb_thread_sleep_deadly_allow_spurious_wakeup(args->self, args->timeout, args->end);
|
||||
return Qnil;
|
||||
}
|
||||
|
||||
|
@ -1001,9 +1024,10 @@ szqueue_sleep_done(VALUE p)
|
|||
}
|
||||
|
||||
static VALUE
|
||||
queue_do_pop(VALUE self, struct rb_queue *q, int should_block)
|
||||
queue_do_pop(VALUE self, struct rb_queue *q, int should_block, VALUE timeout)
|
||||
{
|
||||
check_array(self, q->que);
|
||||
rb_hrtime_t end = queue_timeout2hrtime(timeout);
|
||||
|
||||
while (RARRAY_LEN(q->que) == 0) {
|
||||
if (!should_block) {
|
||||
|
@ -1028,43 +1052,25 @@ queue_do_pop(VALUE self, struct rb_queue *q, int should_block)
|
|||
ccan_list_add_tail(waitq, &queue_waiter.w.node);
|
||||
queue_waiter.as.q->num_waiting++;
|
||||
|
||||
rb_ensure(queue_sleep, self, queue_sleep_done, (VALUE)&queue_waiter);
|
||||
struct queue_sleep_arg queue_sleep_arg = {
|
||||
.self = self,
|
||||
.timeout = timeout,
|
||||
.end = end
|
||||
};
|
||||
|
||||
rb_ensure(queue_sleep, (VALUE)&queue_sleep_arg, queue_sleep_done, (VALUE)&queue_waiter);
|
||||
if (!NIL_P(timeout) && (rb_hrtime_now() >= end))
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return rb_ary_shift(q->que);
|
||||
}
|
||||
|
||||
static int
|
||||
queue_pop_should_block(int argc, const VALUE *argv)
|
||||
{
|
||||
int should_block = 1;
|
||||
rb_check_arity(argc, 0, 1);
|
||||
if (argc > 0) {
|
||||
should_block = !RTEST(argv[0]);
|
||||
}
|
||||
return should_block;
|
||||
}
|
||||
|
||||
/*
|
||||
* Document-method: Thread::Queue#pop
|
||||
* call-seq:
|
||||
* pop(non_block=false)
|
||||
* deq(non_block=false)
|
||||
* shift(non_block=false)
|
||||
*
|
||||
* Retrieves data from the queue.
|
||||
*
|
||||
* If the queue is empty, the calling thread is suspended until data is pushed
|
||||
* onto the queue. If +non_block+ is true, the thread isn't suspended, and
|
||||
* +ThreadError+ is raised.
|
||||
*/
|
||||
|
||||
static VALUE
|
||||
rb_queue_pop(int argc, VALUE *argv, VALUE self)
|
||||
rb_queue_pop(rb_execution_context_t *ec, VALUE self, VALUE non_block, VALUE timeout)
|
||||
{
|
||||
int should_block = queue_pop_should_block(argc, argv);
|
||||
return queue_do_pop(self, queue_ptr(self), should_block);
|
||||
return queue_do_pop(self, queue_ptr(self), !RTEST(non_block), timeout);
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -1283,10 +1289,10 @@ rb_szqueue_push(int argc, VALUE *argv, VALUE self)
|
|||
}
|
||||
|
||||
static VALUE
|
||||
szqueue_do_pop(VALUE self, int should_block)
|
||||
szqueue_do_pop(VALUE self, int should_block, VALUE timeout)
|
||||
{
|
||||
struct rb_szqueue *sq = szqueue_ptr(self);
|
||||
VALUE retval = queue_do_pop(self, &sq->q, should_block);
|
||||
VALUE retval = queue_do_pop(self, &sq->q, should_block, timeout);
|
||||
|
||||
if (queue_length(self, &sq->q) < sq->max) {
|
||||
wakeup_one(szqueue_pushq(sq));
|
||||
|
@ -1294,26 +1300,10 @@ szqueue_do_pop(VALUE self, int should_block)
|
|||
|
||||
return retval;
|
||||
}
|
||||
|
||||
/*
|
||||
* Document-method: Thread::SizedQueue#pop
|
||||
* call-seq:
|
||||
* pop(non_block=false)
|
||||
* deq(non_block=false)
|
||||
* shift(non_block=false)
|
||||
*
|
||||
* Retrieves data from the queue.
|
||||
*
|
||||
* If the queue is empty, the calling thread is suspended until data is pushed
|
||||
* onto the queue. If +non_block+ is true, the thread isn't suspended, and
|
||||
* +ThreadError+ is raised.
|
||||
*/
|
||||
|
||||
static VALUE
|
||||
rb_szqueue_pop(int argc, VALUE *argv, VALUE self)
|
||||
rb_szqueue_pop(rb_execution_context_t *ec, VALUE self, VALUE non_block, VALUE timeout)
|
||||
{
|
||||
int should_block = queue_pop_should_block(argc, argv);
|
||||
return szqueue_do_pop(self, should_block);
|
||||
return szqueue_do_pop(self, !RTEST(non_block), timeout);
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -1597,7 +1587,6 @@ Init_thread_sync(void)
|
|||
rb_define_method(rb_cQueue, "close", rb_queue_close, 0);
|
||||
rb_define_method(rb_cQueue, "closed?", rb_queue_closed_p, 0);
|
||||
rb_define_method(rb_cQueue, "push", rb_queue_push, 1);
|
||||
rb_define_method(rb_cQueue, "pop", rb_queue_pop, -1);
|
||||
rb_define_method(rb_cQueue, "empty?", rb_queue_empty_p, 0);
|
||||
rb_define_method(rb_cQueue, "clear", rb_queue_clear, 0);
|
||||
rb_define_method(rb_cQueue, "length", rb_queue_length, 0);
|
||||
|
@ -1605,8 +1594,6 @@ Init_thread_sync(void)
|
|||
|
||||
rb_define_alias(rb_cQueue, "enq", "push");
|
||||
rb_define_alias(rb_cQueue, "<<", "push");
|
||||
rb_define_alias(rb_cQueue, "deq", "pop");
|
||||
rb_define_alias(rb_cQueue, "shift", "pop");
|
||||
rb_define_alias(rb_cQueue, "size", "length");
|
||||
|
||||
DEFINE_CLASS(SizedQueue, Queue);
|
||||
|
@ -1617,16 +1604,12 @@ Init_thread_sync(void)
|
|||
rb_define_method(rb_cSizedQueue, "max", rb_szqueue_max_get, 0);
|
||||
rb_define_method(rb_cSizedQueue, "max=", rb_szqueue_max_set, 1);
|
||||
rb_define_method(rb_cSizedQueue, "push", rb_szqueue_push, -1);
|
||||
rb_define_method(rb_cSizedQueue, "pop", rb_szqueue_pop, -1);
|
||||
rb_define_method(rb_cSizedQueue, "empty?", rb_szqueue_empty_p, 0);
|
||||
rb_define_method(rb_cSizedQueue, "clear", rb_szqueue_clear, 0);
|
||||
rb_define_method(rb_cSizedQueue, "length", rb_szqueue_length, 0);
|
||||
rb_define_method(rb_cSizedQueue, "num_waiting", rb_szqueue_num_waiting, 0);
|
||||
|
||||
rb_define_alias(rb_cSizedQueue, "enq", "push");
|
||||
rb_define_alias(rb_cSizedQueue, "<<", "push");
|
||||
rb_define_alias(rb_cSizedQueue, "deq", "pop");
|
||||
rb_define_alias(rb_cSizedQueue, "shift", "pop");
|
||||
rb_define_alias(rb_cSizedQueue, "size", "length");
|
||||
|
||||
/* CVar */
|
||||
|
@ -1644,3 +1627,5 @@ Init_thread_sync(void)
|
|||
|
||||
rb_provide("thread.rb");
|
||||
}
|
||||
|
||||
#include "thread_sync.rbinc"
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue