8327042: G1: Parallelism used for redirty logged cards needs better control.

Co-authored-by: Thomas Schatzl <tschatzl@openjdk.org>
Reviewed-by: tschatzl, ayang
This commit is contained in:
Ivan Walulya 2024-03-04 15:17:57 +00:00
parent e889b460c0
commit b69d1b51c7
5 changed files with 48 additions and 27 deletions

View file

@ -113,15 +113,15 @@ G1ParScanThreadState::G1ParScanThreadState(G1CollectedHeap* g1h,
initialize_numa_stats();
}
size_t G1ParScanThreadState::flush_stats(size_t* surviving_young_words, uint num_workers) {
_rdc_local_qset.flush();
size_t G1ParScanThreadState::flush_stats(size_t* surviving_young_words, uint num_workers, BufferNodeList* rdc_buffers) {
*rdc_buffers = _rdc_local_qset.flush();
flush_numa_stats();
// Update allocation statistics.
_plab_allocator->flush_and_retire_stats(num_workers);
_g1h->policy()->record_age_table(&_age_table);
if (_evacuation_failed_info.has_failed()) {
_g1h->gc_tracer_stw()->report_evacuation_failed(_evacuation_failed_info);
_g1h->gc_tracer_stw()->report_evacuation_failed(_evacuation_failed_info);
}
size_t sum = 0;
@ -593,7 +593,6 @@ const size_t* G1ParScanThreadStateSet::surviving_young_words() const {
void G1ParScanThreadStateSet::flush_stats() {
assert(!_flushed, "thread local state from the per thread states should be flushed once");
for (uint worker_id = 0; worker_id < _num_workers; ++worker_id) {
G1ParScanThreadState* pss = _states[worker_id];
assert(pss != nullptr, "must be initialized");
@ -604,7 +603,7 @@ void G1ParScanThreadStateSet::flush_stats() {
// because it resets the PLAB allocator where we get this info from.
size_t lab_waste_bytes = pss->lab_waste_words() * HeapWordSize;
size_t lab_undo_waste_bytes = pss->lab_undo_waste_words() * HeapWordSize;
size_t copied_bytes = pss->flush_stats(_surviving_young_words_total, _num_workers) * HeapWordSize;
size_t copied_bytes = pss->flush_stats(_surviving_young_words_total, _num_workers, &_rdc_buffers[worker_id]) * HeapWordSize;
size_t evac_fail_enqueued_cards = pss->evac_failure_enqueued_cards();
p->record_or_add_thread_work_item(G1GCPhaseTimes::MergePSS, worker_id, copied_bytes, G1GCPhaseTimes::MergePSSCopiedBytes);
@ -615,6 +614,11 @@ void G1ParScanThreadStateSet::flush_stats() {
delete pss;
_states[worker_id] = nullptr;
}
G1DirtyCardQueueSet& dcq = G1BarrierSet::dirty_card_queue_set();
dcq.merge_bufferlists(rdcqs());
rdcqs()->verify_empty();
_flushed = true;
}
@ -706,6 +710,7 @@ G1ParScanThreadStateSet::G1ParScanThreadStateSet(G1CollectedHeap* g1h,
_rdcqs(G1BarrierSet::dirty_card_queue_set().allocator()),
_preserved_marks_set(true /* in_c_heap */),
_states(NEW_C_HEAP_ARRAY(G1ParScanThreadState*, num_workers, mtGC)),
_rdc_buffers(NEW_C_HEAP_ARRAY(BufferNodeList, num_workers, mtGC)),
_surviving_young_words_total(NEW_C_HEAP_ARRAY(size_t, collection_set->young_region_length() + 1, mtGC)),
_num_workers(num_workers),
_flushed(false),
@ -713,6 +718,7 @@ G1ParScanThreadStateSet::G1ParScanThreadStateSet(G1CollectedHeap* g1h,
_preserved_marks_set.init(num_workers);
for (uint i = 0; i < num_workers; ++i) {
_states[i] = nullptr;
_rdc_buffers[i] = BufferNodeList();
}
memset(_surviving_young_words_total, 0, (collection_set->young_region_length() + 1) * sizeof(size_t));
}
@ -721,5 +727,6 @@ G1ParScanThreadStateSet::~G1ParScanThreadStateSet() {
assert(_flushed, "thread local state from the per thread states should have been flushed");
FREE_C_HEAP_ARRAY(G1ParScanThreadState*, _states);
FREE_C_HEAP_ARRAY(size_t, _surviving_young_words_total);
FREE_C_HEAP_ARRAY(BufferNodeList, _rdc_buffers);
_preserved_marks_set.reclaim();
}

View file

@ -166,7 +166,7 @@ public:
// Pass locally gathered statistics to global state. Returns the total number of
// HeapWords copied.
size_t flush_stats(size_t* surviving_young_words, uint num_workers);
size_t flush_stats(size_t* surviving_young_words, uint num_workers, BufferNodeList* buffer_log);
private:
void do_partial_array(PartialArrayScanTask task);
@ -247,6 +247,7 @@ class G1ParScanThreadStateSet : public StackObj {
G1RedirtyCardsQueueSet _rdcqs;
PreservedMarksSet _preserved_marks_set;
G1ParScanThreadState** _states;
BufferNodeList* _rdc_buffers;
size_t* _surviving_young_words_total;
uint _num_workers;
bool _flushed;
@ -260,12 +261,14 @@ class G1ParScanThreadStateSet : public StackObj {
~G1ParScanThreadStateSet();
G1RedirtyCardsQueueSet* rdcqs() { return &_rdcqs; }
BufferNodeList* rdc_buffers() { return _rdc_buffers; }
PreservedMarksSet* preserved_marks_set() { return &_preserved_marks_set; }
void flush_stats();
void record_unused_optional_region(HeapRegion* hr);
G1ParScanThreadState* state_for_worker(uint worker_id);
uint num_workers() const { return _num_workers; }
const size_t* surviving_young_words() const;
};

View file

@ -65,10 +65,12 @@ void G1RedirtyCardsLocalQueueSet::enqueue(void* value) {
}
}
void G1RedirtyCardsLocalQueueSet::flush() {
BufferNodeList G1RedirtyCardsLocalQueueSet::flush() {
flush_queue(_queue);
BufferNodeList cur_buffers = _buffers;
_shared_qset->add_bufferlist(_buffers);
_buffers = BufferNodeList();
return cur_buffers;
}
// G1RedirtyCardsLocalQueueSet::Queue

View file

@ -56,7 +56,9 @@ public:
void enqueue(void* value);
// Transfer all completed buffers to the shared qset.
void flush();
// Returns the flushed BufferNodeList which is later used
// as a shortcut into the shared qset.
BufferNodeList flush();
};
// Card table entries to be redirtied and the cards reprocessed later.

View file

@ -590,23 +590,17 @@ public:
};
class G1PostEvacuateCollectionSetCleanupTask2::RedirtyLoggedCardsTask : public G1AbstractSubTask {
G1RedirtyCardsQueueSet* _rdcqs;
BufferNode* volatile _nodes;
BufferNodeList* _rdc_buffers;
uint _num_buffer_lists;
G1EvacFailureRegions* _evac_failure_regions;
public:
RedirtyLoggedCardsTask(G1RedirtyCardsQueueSet* rdcqs, G1EvacFailureRegions* evac_failure_regions) :
RedirtyLoggedCardsTask(G1EvacFailureRegions* evac_failure_regions, BufferNodeList* rdc_buffers, uint num_buffer_lists) :
G1AbstractSubTask(G1GCPhaseTimes::RedirtyCards),
_rdcqs(rdcqs),
_nodes(rdcqs->all_completed_buffers()),
_rdc_buffers(rdc_buffers),
_num_buffer_lists(num_buffer_lists),
_evac_failure_regions(evac_failure_regions) { }
virtual ~RedirtyLoggedCardsTask() {
G1DirtyCardQueueSet& dcq = G1BarrierSet::dirty_card_queue_set();
dcq.merge_bufferlists(_rdcqs);
_rdcqs->verify_empty();
}
double worker_cost() const override {
// Needs more investigation.
return G1CollectedHeap::heap()->workers()->active_workers();
@ -614,13 +608,23 @@ public:
void do_work(uint worker_id) override {
RedirtyLoggedCardTableEntryClosure cl(G1CollectedHeap::heap(), _evac_failure_regions);
BufferNode* next = Atomic::load(&_nodes);
while (next != nullptr) {
BufferNode* node = next;
next = Atomic::cmpxchg(&_nodes, node, node->next());
if (next == node) {
cl.apply_to_buffer(node, worker_id);
next = node->next();
uint start = worker_id;
for (uint i = 0; i < _num_buffer_lists; i++) {
uint index = (start + i) % _num_buffer_lists;
BufferNode* next = Atomic::load(&_rdc_buffers[index]._head);
BufferNode* tail = Atomic::load(&_rdc_buffers[index]._tail);
while (next != nullptr) {
BufferNode* node = next;
next = Atomic::cmpxchg(&_rdc_buffers[index]._head, node, (node != tail ) ? node->next() : nullptr);
if (next == node) {
cl.apply_to_buffer(node, worker_id);
next = (node != tail ) ? node->next() : nullptr;
} else {
break; // If there is contention, move to the next BufferNodeList
}
}
}
record_work_item(worker_id, 0, cl.num_dirtied());
@ -970,7 +974,10 @@ G1PostEvacuateCollectionSetCleanupTask2::G1PostEvacuateCollectionSetCleanupTask2
add_parallel_task(new RestorePreservedMarksTask(per_thread_states->preserved_marks_set()));
add_parallel_task(new ProcessEvacuationFailedRegionsTask(evac_failure_regions));
}
add_parallel_task(new RedirtyLoggedCardsTask(per_thread_states->rdcqs(), evac_failure_regions));
add_parallel_task(new RedirtyLoggedCardsTask(evac_failure_regions,
per_thread_states->rdc_buffers(),
per_thread_states->num_workers()));
if (UseTLAB && ResizeTLAB) {
add_parallel_task(new ResizeTLABsTask());
}