From 8df6e59678341b93ca618d3fe4ac27aec0b7c47f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uwe=20Kr=C3=BCger?= <45282134+UweKrueger@users.noreply.github.com> Date: Wed, 15 Jul 2020 10:22:33 +0200 Subject: [PATCH] sync: add semaphores (#5831) --- vlib/builtin/cfns.c.v | 16 ++++ vlib/sync/sync_nix.c.v | 121 ++++++++++++++++++++++++++++ vlib/sync/sync_windows.c.v | 31 +++++++ vlib/time/time_nix.c.v | 25 ++++++ vlib/v/tests/autolock_array2_test.v | 24 ++---- vlib/v/tests/semaphore_test.v | 25 ++++++ vlib/v/tests/semaphore_timed_test.v | 27 +++++++ 7 files changed, 253 insertions(+), 16 deletions(-) create mode 100644 vlib/v/tests/semaphore_test.v create mode 100644 vlib/v/tests/semaphore_timed_test.v diff --git a/vlib/builtin/cfns.c.v b/vlib/builtin/cfns.c.v index 43356bac8e..8b1abad953 100644 --- a/vlib/builtin/cfns.c.v +++ b/vlib/builtin/cfns.c.v @@ -390,6 +390,9 @@ fn C.ReleaseMutex(voidptr) bool fn C.CreateEvent(int, bool, bool, byteptr) voidptr fn C.SetEvent(voidptr) int +fn C.CreateSemaphore(voidptr, int, int, voidptr) voidptr +fn C.ReleaseSemaphore(voidptr, int, voidptr) voidptr + fn C.InitializeSRWLock(voidptr) fn C.AcquireSRWLockShared(voidptr) fn C.AcquireSRWLockExclusive(voidptr) @@ -409,6 +412,19 @@ fn C.pthread_rwlock_rdlock(voidptr) int fn C.pthread_rwlock_wrlock(voidptr) int fn C.pthread_rwlock_unlock(voidptr) int +fn C.pthread_condattr_init(voidptr) int +fn C.pthread_condattr_setpshared(voidptr, int) int +fn C.pthread_cond_init(voidptr, voidptr) int +fn C.pthread_cond_signal(voidptr) int +fn C.pthread_cond_wait(voidptr, voidptr) int +fn C.pthread_cond_timedwait(voidptr, voidptr, voidptr) int + +fn C.sem_init(voidptr, int, u32) int +fn C.sem_post(voidptr) int +fn C.sem_wait(voidptr) int +fn C.sem_trywait(voidptr) int +fn C.sem_timedwait(voidptr, voidptr) int + fn C.read(fd int, buf voidptr, count size_t) int fn C.write(fd int, buf voidptr, count size_t) int fn C.close(fd int) int diff --git a/vlib/sync/sync_nix.c.v b/vlib/sync/sync_nix.c.v index a11e9fdf25..0fb125c2d1 100644 --- a/vlib/sync/sync_nix.c.v +++ b/vlib/sync/sync_nix.c.v @@ -3,7 +3,10 @@ // that can be found in the LICENSE file. module sync +import time + #flag -lpthread +#include // [init_with=new_mutex] // TODO: implement support for this struct attribute, and disallow Mutex{} from outside the sync.new_mutex() function. [ref_only] @@ -21,6 +24,36 @@ struct RwMutexAttr { attr C.pthread_rwlockattr_t } +/* MacOSX has no unnamed semaphores and no `timed_wait()` at all + so we emulate the behaviour with other devices */ +struct MacOSX_Semaphore { + mtx C.pthread_mutex_t + cond C.pthread_cond_t +mut: + count int +} + +[ref_only] +struct PosixSemaphore { + sem C.sem_t +} + +[ref_only] +struct CondAttr { + attr C.pthread_condattr_t +} + +pub struct Semaphore { + /* + $if macos { + sem &MacOSX_Semaphore + } $else { + sem &PosixSemaphore + } + */ + sem voidptr // since the above does not work, yet +} + pub fn new_mutex() &Mutex { m := &Mutex{} C.pthread_mutex_init(&m.mutex, C.NULL) @@ -65,3 +98,91 @@ pub fn (mut m RwMutex) r_unlock() { pub fn (mut m RwMutex) w_unlock() { C.pthread_rwlock_unlock(&m.mutex) } + +pub fn new_semaphore() Semaphore { + $if macos { + s := Semaphore{ + sem: &MacOSX_Semaphore{count: 0} + } + C.pthread_mutex_init(&&MacOSX_Semaphore(s.sem).mtx, C.NULL) + a := &CondAttr{} + C.pthread_condattr_init(&a.attr) + C.pthread_condattr_setpshared(&a.attr, C.PTHREAD_PROCESS_PRIVATE) + C.pthread_cond_init(&&MacOSX_Semaphore(s.sem).cond, &a.attr) + return s + } $else { + s := Semaphore{ + sem: &PosixSemaphore{} + } + C.sem_init(&&PosixSemaphore(s.sem).sem, 0, 0) + return s + } +} + +pub fn (s Semaphore) post() { + $if macos { + C.pthread_mutex_lock(&&MacOSX_Semaphore(s.sem).mtx) + (&MacOSX_Semaphore(s.sem)).count++ + C.pthread_cond_signal(&&MacOSX_Semaphore(s.sem).cond) + C.pthread_mutex_unlock(&&MacOSX_Semaphore(s.sem).mtx) + } $else { + C.sem_post(&&PosixSemaphore(s.sem).sem) + } +} + +pub fn (s Semaphore) wait() { + $if macos { + C.pthread_mutex_lock(&&MacOSX_Semaphore(s.sem).mtx) + for &MacOSX_Semaphore(s.sem).count == 0 { + C.pthread_cond_wait(&&MacOSX_Semaphore(s.sem).cond, &&MacOSX_Semaphore(s.sem).mtx) + } + (&MacOSX_Semaphore(s.sem)).count-- + C.pthread_mutex_unlock(&&MacOSX_Semaphore(s.sem).mtx) + } $else { + C.sem_wait(&&PosixSemaphore(s.sem).sem) + } +} + +pub fn (s Semaphore) try_wait() bool { + $if macos { + t_spec := time.zero_timespec() + C.pthread_mutex_lock(&&MacOSX_Semaphore(s.sem).mtx) + for &MacOSX_Semaphore(s.sem).count == 0 { + res := C.pthread_cond_timedwait(&&MacOSX_Semaphore(s.sem).cond, &&MacOSX_Semaphore(s.sem).mtx, &t_spec) + if res == C.ETIMEDOUT { + break + } + } + mut res := false + if &MacOSX_Semaphore(s.sem).count > 0 { // success + (&MacOSX_Semaphore(s.sem)).count-- + res = true + } + C.pthread_mutex_unlock(&&MacOSX_Semaphore(s.sem).mtx) + return res + } $else { + return C.sem_trywait(&&PosixSemaphore(s.sem).sem) == 0 + } +} + +pub fn (s Semaphore) timed_wait(timeout time.Duration) bool { + t_spec := timeout.timespec() + $if macos { + C.pthread_mutex_lock(&&MacOSX_Semaphore(s.sem).mtx) + for &MacOSX_Semaphore(s.sem).count == 0 { + res := C.pthread_cond_timedwait(&&MacOSX_Semaphore(s.sem).cond, &&MacOSX_Semaphore(s.sem).mtx, &t_spec) + if res == C.ETIMEDOUT { + break + } + } + mut res := false + if &MacOSX_Semaphore(s.sem).count > 0 { // success + (&MacOSX_Semaphore(s.sem)).count-- + res = true + } + C.pthread_mutex_unlock(&&MacOSX_Semaphore(s.sem).mtx) + return res + } $else { + return C.sem_timedwait(&&PosixSemaphore(s.sem).sem, &t_spec) == 0 + } +} diff --git a/vlib/sync/sync_windows.c.v b/vlib/sync/sync_windows.c.v index d09a4de1ac..9574fa52db 100644 --- a/vlib/sync/sync_windows.c.v +++ b/vlib/sync/sync_windows.c.v @@ -3,11 +3,15 @@ // that can be found in the LICENSE file. module sync +import time + // TODO: The suggestion of using CriticalSection instead of mutex // was discussed. Needs consideration. // Mutex HANDLE type MHANDLE voidptr +// Semaphore HANDLE +type SHANDLE voidptr //[init_with=new_mutex] // TODO: implement support for this struct attribute, and disallow Mutex{} from outside the sync.new_mutex() function. @@ -28,6 +32,11 @@ mut: mx C.SRWLOCK // mutex handle } +pub struct Semaphore { +mut: + sem SHANDLE +} + enum MutexState { broken waiting @@ -118,3 +127,25 @@ pub fn (mut m Mutex) destroy() { C.CloseHandle(m.mx) // destroy mutex m.state = .destroyed // setting up reference to invalid state } + +pub fn new_semaphore() Semaphore { + return Semaphore{ + sem: SHANDLE(C.CreateSemaphore(0, 0, C.INT32_MAX, 0)) + } +} + +pub fn (s Semaphore) post() { + C.ReleaseSemaphore(s.sem, 1, 0) +} + +pub fn (s Semaphore) wait() { + C.WaitForSingleObject(s.sem, C.INFINITE) +} + +pub fn (s Semaphore) try_wait() bool { + return C.WaitForSingleObject(s.sem, 0) == 0 +} + +pub fn (s Semaphore) timed_wait(timeout time.Duration) bool { + return C.WaitForSingleObject(s.sem, timeout / time.millisecond) == 0 +} diff --git a/vlib/time/time_nix.c.v b/vlib/time/time_nix.c.v index 86f1090cb6..b824cfcffd 100644 --- a/vlib/time/time_nix.c.v +++ b/vlib/time/time_nix.c.v @@ -35,6 +35,7 @@ type time_t voidptr // in most systems, these are __quad_t, which is an i64 struct C.timespec { +mut: tv_sec i64 tv_nsec i64 } @@ -78,3 +79,27 @@ pub struct C.timeval { tv_sec u64 tv_usec u64 } + +// return absolute timespec for now()+d +pub fn (d Duration) timespec() C.timespec { + mut ts := C.timespec{} + C.clock_gettime(C.CLOCK_REALTIME, &ts) + d_sec := d / second + d_nsec := d % second + ts.tv_sec += d_sec + ts.tv_nsec += d_nsec + if ts.tv_nsec > second { + ts.tv_nsec -= second + ts.tv_sec++ + } + return ts +} + +// return timespec of 1970/1/1 +pub fn zero_timespec() C.timespec { + ts := C.timespec{ + tv_sec: 0 + tv_nsec: 0 + } + return ts +} diff --git a/vlib/v/tests/autolock_array2_test.v b/vlib/v/tests/autolock_array2_test.v index 737b7a43d8..4385bc6207 100644 --- a/vlib/v/tests/autolock_array2_test.v +++ b/vlib/v/tests/autolock_array2_test.v @@ -1,35 +1,27 @@ import sync -import time const ( iterations_per_thread2 = 100000 ) -fn inc_elements(shared foo []int, n int) { +fn inc_elements(shared foo []int, n int, sem sync.Semaphore) { for _ in 0 .. iterations_per_thread2 { foo[n]++ } - foo[0]++ // indicat that thread is finished + sem.post() // indicat that thread is finished } fn test_autolocked_array_2() { shared abc := &[0, 0, 0] - go inc_elements(shared abc, 1) - go inc_elements(shared abc, 2) + sem := sync.new_semaphore() + go inc_elements(shared abc, 1, sem) + go inc_elements(shared abc, 2, sem) for _ in 0 .. iterations_per_thread2 { abc[2]++ } - // wait for coroutines to finish - that should really be - // done by channels, yield, semaphore... - for { - mut finished_threads := 0 - rlock abc { - finished_threads = abc[0] - } - if finished_threads == 2 { - break - } - time.sleep_ms(100) + // wait for the 2 coroutines to finish using the semaphore + for _ in 0 .. 2 { + sem.wait() } rlock abc { assert abc[1] == iterations_per_thread2 diff --git a/vlib/v/tests/semaphore_test.v b/vlib/v/tests/semaphore_test.v new file mode 100644 index 0000000000..b6fe808ace --- /dev/null +++ b/vlib/v/tests/semaphore_test.v @@ -0,0 +1,25 @@ +import sync + +const ( + signals_per_thread = 10000000 +) + +fn send_signals(sem, sem_end sync.Semaphore) { + for _ in 0 .. signals_per_thread { + sem.post() + } + sem_end.post() +} + +fn test_semaphores() { + sem := sync.new_semaphore() + sem_end := sync.new_semaphore() + go send_signals(sem, sem_end) + go send_signals(sem, sem_end) + for _ in 0 .. 2 * signals_per_thread { + sem.wait() + } + sem_end.wait() + sem_end.wait() + assert true +} diff --git a/vlib/v/tests/semaphore_timed_test.v b/vlib/v/tests/semaphore_timed_test.v new file mode 100644 index 0000000000..2e61e0f054 --- /dev/null +++ b/vlib/v/tests/semaphore_timed_test.v @@ -0,0 +1,27 @@ +import sync +import time + +fn run_forever(shared foo []int, sem sync.Semaphore) { + for { + foo[0]++ + } + sem.post() // indicat that thread is finished - never happens +} + +fn test_semaphore() { + shared abc := &[0] + sem := sync.new_semaphore() + go run_forever(shared abc, sem) + for _ in 0 .. 100000 { + abc[0]-- + } + // wait for the 2 coroutines to finish using the semaphore + stopwatch := time.new_stopwatch({}) + mut elapsed := stopwatch.elapsed() + if !sem.timed_wait(500 * time.millisecond) { + // we should come here due to timeout + elapsed = stopwatch.elapsed() + } + println('elapsed: ${f64(elapsed)/time.second}s') + assert elapsed >= 495 * time.millisecond +}