mirror of
https://github.com/openjdk/jdk.git
synced 2025-08-27 14:54:52 +02:00
8193174: SubmissionPublisher invokes the Subscriber's onComplete before all of its submitted items have been published
Reviewed-by: martin, psandoz, chegar
This commit is contained in:
parent
ed69a7db9c
commit
0b3b384a27
1 changed files with 10 additions and 8 deletions
|
@ -1252,18 +1252,20 @@ public class SubmissionPublisher<T> implements Publisher<T>,
|
|||
head = h += taken;
|
||||
d = subtractDemand(taken);
|
||||
}
|
||||
else if ((empty = (t == h)) && (c & COMPLETE) != 0) {
|
||||
closeOnComplete(s); // end of stream
|
||||
break;
|
||||
}
|
||||
else if ((d = demand) == 0L && (c & REQS) != 0)
|
||||
weakCasCtl(c, c & ~REQS); // exhausted demand
|
||||
else if (d != 0L && (c & REQS) == 0)
|
||||
weakCasCtl(c, c | REQS); // new demand
|
||||
else if (t == (t = tail) && (empty || d == 0L)) {
|
||||
int bit = ((c & ACTIVE) != 0) ? ACTIVE : RUN;
|
||||
if (weakCasCtl(c, c & ~bit) && bit == RUN)
|
||||
break; // un-keep-alive or exit
|
||||
else if (t == (t = tail)) { // stability check
|
||||
if ((empty = (t == h)) && (c & COMPLETE) != 0) {
|
||||
closeOnComplete(s); // end of stream
|
||||
break;
|
||||
}
|
||||
else if (empty || d == 0L) {
|
||||
int bit = ((c & ACTIVE) != 0) ? ACTIVE : RUN;
|
||||
if (weakCasCtl(c, c & ~bit) && bit == RUN)
|
||||
break; // un-keep-alive or exit
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue