sync: only release semaphore in WaitGroup when there are waiters (#10967)
parent
b0a721b2ec
commit
e98817e5ce
|
@ -6,6 +6,12 @@ module sync
|
||||||
[trusted]
|
[trusted]
|
||||||
fn C.atomic_fetch_add_u32(voidptr, u32) u32
|
fn C.atomic_fetch_add_u32(voidptr, u32) u32
|
||||||
|
|
||||||
|
[trusted]
|
||||||
|
fn C.atomic_load_u32(voidptr) u32
|
||||||
|
|
||||||
|
[trusted]
|
||||||
|
fn C.atomic_compare_exchange_weak_u32(voidptr, voidptr, u32) bool
|
||||||
|
|
||||||
// WaitGroup
|
// WaitGroup
|
||||||
// Do not copy an instance of WaitGroup, use a ref instead.
|
// Do not copy an instance of WaitGroup, use a ref instead.
|
||||||
//
|
//
|
||||||
|
@ -22,6 +28,7 @@ fn C.atomic_fetch_add_u32(voidptr, u32) u32
|
||||||
struct WaitGroup {
|
struct WaitGroup {
|
||||||
mut:
|
mut:
|
||||||
task_count u32 // current task count - reading/writing should be atomic
|
task_count u32 // current task count - reading/writing should be atomic
|
||||||
|
wait_count u32 // current wait count - reading/writing should be atomic
|
||||||
sem Semaphore // This blocks wait() until tast_countreleased by add()
|
sem Semaphore // This blocks wait() until tast_countreleased by add()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -41,11 +48,22 @@ pub fn (mut wg WaitGroup) init() {
|
||||||
pub fn (mut wg WaitGroup) add(delta int) {
|
pub fn (mut wg WaitGroup) add(delta int) {
|
||||||
old_nrjobs := int(C.atomic_fetch_add_u32(&wg.task_count, u32(delta)))
|
old_nrjobs := int(C.atomic_fetch_add_u32(&wg.task_count, u32(delta)))
|
||||||
new_nrjobs := old_nrjobs + delta
|
new_nrjobs := old_nrjobs + delta
|
||||||
|
mut num_waiters := C.atomic_load_u32(&wg.wait_count)
|
||||||
if new_nrjobs < 0 {
|
if new_nrjobs < 0 {
|
||||||
panic('Negative number of jobs in waitgroup')
|
panic('Negative number of jobs in waitgroup')
|
||||||
}
|
}
|
||||||
if new_nrjobs == 0 {
|
|
||||||
wg.sem.post()
|
if new_nrjobs == 0 && num_waiters > 0 {
|
||||||
|
// clear waiters
|
||||||
|
for !C.atomic_compare_exchange_weak_u32(&wg.wait_count, &num_waiters, 0) {
|
||||||
|
if num_waiters == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (num_waiters > 0) {
|
||||||
|
wg.sem.post()
|
||||||
|
num_waiters--
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -56,5 +74,11 @@ pub fn (mut wg WaitGroup) done() {
|
||||||
|
|
||||||
// wait blocks until all tasks are done (task count becomes zero)
|
// wait blocks until all tasks are done (task count becomes zero)
|
||||||
pub fn (mut wg WaitGroup) wait() {
|
pub fn (mut wg WaitGroup) wait() {
|
||||||
|
nrjobs := int(C.atomic_load_u32(&wg.task_count))
|
||||||
|
if nrjobs == 0 {
|
||||||
|
// no need to wait
|
||||||
|
return
|
||||||
|
}
|
||||||
|
C.atomic_fetch_add_u32(&wg.wait_count, 1)
|
||||||
wg.sem.wait() // blocks until task_count becomes 0
|
wg.sem.wait() // blocks until task_count becomes 0
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,41 @@
|
||||||
|
module sync
|
||||||
|
|
||||||
|
import time
|
||||||
|
|
||||||
|
fn test_waitgroup_reuse() {
|
||||||
|
mut wg := new_waitgroup()
|
||||||
|
|
||||||
|
wg.add(1)
|
||||||
|
wg.done()
|
||||||
|
|
||||||
|
wg.add(1)
|
||||||
|
mut executed := false
|
||||||
|
go fn (mut wg WaitGroup, executed voidptr) {
|
||||||
|
defer {
|
||||||
|
wg.done()
|
||||||
|
}
|
||||||
|
unsafe {
|
||||||
|
*(&bool(executed)) = true
|
||||||
|
}
|
||||||
|
time.sleep(100 * time.millisecond)
|
||||||
|
assert wg.wait_count == 1
|
||||||
|
}(mut wg, voidptr(&executed))
|
||||||
|
|
||||||
|
wg.wait()
|
||||||
|
assert executed
|
||||||
|
assert wg.wait_count == 0
|
||||||
|
}
|
||||||
|
|
||||||
|
fn test_waitgroup_no_use() {
|
||||||
|
mut done := false
|
||||||
|
go fn (done voidptr) {
|
||||||
|
time.sleep(1 * time.second)
|
||||||
|
if *(&bool(done)) == false {
|
||||||
|
panic('test_waitgroup_no_use did not complete in time')
|
||||||
|
}
|
||||||
|
}(voidptr(&done))
|
||||||
|
|
||||||
|
mut wg := new_waitgroup()
|
||||||
|
wg.wait()
|
||||||
|
done = true
|
||||||
|
}
|
Loading…
Reference in New Issue