Implement SizedQueue#push(timeout: sec)

[Feature #18944]

If both `non_block=true` and `timeout:` are supplied, ArgumentError
is raised.
This commit is contained in:
Jean Boussier 2022-07-26 17:40:00 +02:00
parent b3718edee2
commit fe61cad749
Notes: git 2022-08-18 17:07:58 +09:00
4 changed files with 111 additions and 35 deletions

View file

@ -1229,39 +1229,15 @@ rb_szqueue_max_set(VALUE self, VALUE vmax)
return vmax;
}
static int
szqueue_push_should_block(int argc, const VALUE *argv)
{
int should_block = 1;
rb_check_arity(argc, 1, 2);
if (argc > 1) {
should_block = !RTEST(argv[1]);
}
return should_block;
}
/*
* Document-method: Thread::SizedQueue#push
* call-seq:
* push(object, non_block=false)
* enq(object, non_block=false)
* <<(object)
*
* Pushes +object+ to the queue.
*
* If there is no space left in the queue, waits until space becomes
* available, unless +non_block+ is true. If +non_block+ is true, the
* thread isn't suspended, and +ThreadError+ is raised.
*/
static VALUE
rb_szqueue_push(int argc, VALUE *argv, VALUE self)
rb_szqueue_push(rb_execution_context_t *ec, VALUE self, VALUE object, VALUE non_block, VALUE timeout)
{
rb_hrtime_t end = queue_timeout2hrtime(timeout);
bool timed_out = false;
struct rb_szqueue *sq = szqueue_ptr(self);
int should_block = szqueue_push_should_block(argc, argv);
while (queue_length(self, &sq->q) >= sq->max) {
if (!should_block) {
if (RTEST(non_block)) {
rb_raise(rb_eThreadError, "queue full");
}
else if (queue_closed_p(self)) {
@ -1281,11 +1257,14 @@ rb_szqueue_push(int argc, VALUE *argv, VALUE self)
struct queue_sleep_arg queue_sleep_arg = {
.self = self,
.timeout = Qnil,
.end = 0
.timeout = timeout,
.end = end
};
rb_ensure(queue_sleep, (VALUE)&queue_sleep_arg, szqueue_sleep_done, (VALUE)&queue_waiter);
if (!NIL_P(timeout) && rb_hrtime_now() >= end) {
timed_out = true;
break;
}
}
}
@ -1293,7 +1272,9 @@ rb_szqueue_push(int argc, VALUE *argv, VALUE self)
raise_closed_queue_error(self);
}
return queue_do_push(self, &sq->q, argv[0]);
if (timed_out) return Qnil;
return queue_do_push(self, &sq->q, object);
}
static VALUE
@ -1611,13 +1592,10 @@ Init_thread_sync(void)
rb_define_method(rb_cSizedQueue, "close", rb_szqueue_close, 0);
rb_define_method(rb_cSizedQueue, "max", rb_szqueue_max_get, 0);
rb_define_method(rb_cSizedQueue, "max=", rb_szqueue_max_set, 1);
rb_define_method(rb_cSizedQueue, "push", rb_szqueue_push, -1);
rb_define_method(rb_cSizedQueue, "empty?", rb_szqueue_empty_p, 0);
rb_define_method(rb_cSizedQueue, "clear", rb_szqueue_clear, 0);
rb_define_method(rb_cSizedQueue, "length", rb_szqueue_length, 0);
rb_define_method(rb_cSizedQueue, "num_waiting", rb_szqueue_num_waiting, 0);
rb_define_alias(rb_cSizedQueue, "enq", "push");
rb_define_alias(rb_cSizedQueue, "<<", "push");
rb_define_alias(rb_cSizedQueue, "size", "length");
/* CVar */