From 640bbbae8509d2a6afaf1d5124ca82899b623c4e Mon Sep 17 00:00:00 2001 From: Richard Warburton Date: Mon, 22 Jun 2020 21:07:34 +1200 Subject: [PATCH] waitgroup: remove races, make platform independent, document --- vlib/sync/waitgroup.v | 64 +++++++++++++++++++++++++++---------------- 1 file changed, 41 insertions(+), 23 deletions(-) diff --git a/vlib/sync/waitgroup.v b/vlib/sync/waitgroup.v index 92fe3b16d7..a72437af2d 100644 --- a/vlib/sync/waitgroup.v +++ b/vlib/sync/waitgroup.v @@ -1,41 +1,59 @@ -// Copyright (c) 2019-2020 Alexander Medvednikov. All rights reserved. +// Copyright (c) 2019 Alexander Medvednikov. All rights reserved. // Use of this source code is governed by an MIT license // that can be found in the LICENSE file. module sync +// WaitGroup implementation. wait() blocks until all tasks complete. +// Do not copy an instance of WaitGroup, use a ref instead. +// +// Two mutexes are required so that wait() doesn't unblock on done()/add() before +// task_count becomes zero. +// // [init_with=new_waitgroup] // TODO: implement support for init_with struct attribute, and disallow WaitGroup{} from outside the sync.new_waitgroup() function. [ref_only] -pub struct WaitGroup { +struct WaitGroup { mut: - mu &Mutex = &Mutex(0) - active int + task_count int // current task count + task_count_mutex &Mutex = &Mutex(0) // This mutex protects the task_count count in add() + wait_blocker &Mutex = &Mutex(0) // This mutex blocks the wait() until released by add() } pub fn new_waitgroup() &WaitGroup { - return &WaitGroup{mu: sync.new_mutex() } -} - -pub fn (mut wg WaitGroup) add(delta int) { - wg.mu.lock() - wg.active += delta - wg.mu.unlock() - if wg.active < 0 { - panic('Negative number of jobs in waitgroup') + return &WaitGroup{ + task_count_mutex: new_mutex() + wait_blocker: new_mutex() } } +// add increments (+ve delta) or decrements (-ve delta) task count by delta +// and unblocks any wait() calls if task count becomes zero. +// add panics if task count drops below zero. +pub fn (mut wg WaitGroup) add(delta int) { + // protect task_count + wg.task_count_mutex.lock() + defer { + wg.task_count_mutex.unlock() + } + // If task_count likely to leave zero, set wait() to block + if wg.task_count == 0 { + wg.wait_blocker.lock() + } + wg.task_count += delta + if wg.task_count < 0 { + panic('Negative number of jobs in waitgroup') + } + if wg.task_count == 0 { // if no more task_count tasks + wg.wait_blocker.unlock() // unblock wait() + } +} + +// done is a convenience fn for add(-1) pub fn (mut wg WaitGroup) done() { wg.add(-1) } -pub fn (wg &WaitGroup) wait() { - for wg.active > 0 { - // Do not remove this, busy empty loops are optimized - // with -prod by some compilers, see issue #2874 - $if windows { - C.Sleep(1) - } $else { - C.usleep(1000) - } - } +// wait blocks until all tasks are done (task count becomes zero) +pub fn (mut wg WaitGroup) wait() { + wg.wait_blocker.lock() // blocks until task_count becomes 0 + wg.wait_blocker.unlock() // allow other wait()s to unblock or reuse wait group }