From f05ee26a1f4814f4fc88d32099f4d2bbc9aca824 Mon Sep 17 00:00:00 2001 From: Peter Zhu Date: Mon, 14 Jul 2025 11:01:34 -0400 Subject: [PATCH] Add rb_concurrent_set_find --- concurrent_set.c | 55 +++++++++++++++++++++++++++++++++++++++ internal/concurrent_set.h | 2 ++ 2 files changed, 57 insertions(+) diff --git a/concurrent_set.c b/concurrent_set.c index 3380bd189d..e84bd2603c 100644 --- a/concurrent_set.c +++ b/concurrent_set.c @@ -175,6 +175,61 @@ concurrent_set_try_resize(VALUE old_set_obj, VALUE *set_obj_ptr) } } +VALUE +rb_concurrent_set_find(VALUE *set_obj_ptr, VALUE key) +{ + RUBY_ASSERT(key >= CONCURRENT_SET_SPECIAL_VALUE_COUNT); + + VALUE set_obj; + + retry: + set_obj = RUBY_ATOMIC_VALUE_LOAD(*set_obj_ptr); + RUBY_ASSERT(set_obj); + struct concurrent_set *set = RTYPEDDATA_GET_DATA(set_obj); + + struct concurrent_set_probe probe; + VALUE hash = set->funcs->hash(key); + int idx = concurrent_set_probe_start(&probe, set, hash); + + while (true) { + struct concurrent_set_entry *entry = &set->entries[idx]; + VALUE curr_key = RUBY_ATOMIC_VALUE_LOAD(entry->key); + + switch (curr_key) { + case CONCURRENT_SET_EMPTY: + return 0; + case CONCURRENT_SET_DELETED: + break; + case CONCURRENT_SET_MOVED: + // Wait + RB_VM_LOCKING(); + + goto retry; + default: { + VALUE curr_hash = RUBY_ATOMIC_VALUE_LOAD(entry->hash); + if ((curr_hash == hash || curr_hash == 0) && set->funcs->cmp(key, curr_key)) { + // We've found a match. + if (UNLIKELY(!RB_SPECIAL_CONST_P(curr_key) && rb_objspace_garbage_object_p(curr_key))) { + // This is a weakref set, so after marking but before sweeping is complete we may find a matching garbage object. + // Skip it and mark it as deleted. + RUBY_ATOMIC_VALUE_CAS(entry->key, curr_key, CONCURRENT_SET_DELETED); + + // Fall through and continue our search. + } + else { + RB_GC_GUARD(set_obj); + return curr_key; + } + } + + break; + } + } + + idx = concurrent_set_probe_next(&probe); + } +} + VALUE rb_concurrent_set_find_or_insert(VALUE *set_obj_ptr, VALUE key, void *data) { diff --git a/internal/concurrent_set.h b/internal/concurrent_set.h index ecd33d85ce..b249acd64f 100644 --- a/internal/concurrent_set.h +++ b/internal/concurrent_set.h @@ -1,6 +1,7 @@ #ifndef RUBY_RACTOR_SAFE_TABLE_H #define RUBY_RACTOR_SAFE_TABLE_H +#include "ruby/atomic.h" #include "ruby/ruby.h" typedef VALUE (*rb_concurrent_set_hash_func)(VALUE key); @@ -15,6 +16,7 @@ struct rb_concurrent_set_funcs { VALUE rb_concurrent_set_new(const struct rb_concurrent_set_funcs *funcs, int capacity); rb_atomic_t rb_concurrent_set_size(VALUE set_obj); +VALUE rb_concurrent_set_find(VALUE *set_obj_ptr, VALUE key); VALUE rb_concurrent_set_find_or_insert(VALUE *set_obj_ptr, VALUE key, void *data); VALUE rb_concurrent_set_delete_by_identity(VALUE set_obj, VALUE key); void rb_concurrent_set_foreach_with_replace(VALUE set_obj, int (*callback)(VALUE *key, void *data), void *data);