From 3f5aa5e634b187ecd75a5ae28c744d6d787a320f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uwe=20Kr=C3=BCger?= <45282134+UweKrueger@users.noreply.github.com> Date: Fri, 18 Jun 2021 11:44:18 +0200 Subject: [PATCH] sync: make `Semaphore.*wait()` robust against interrupts by signals (#10491) --- vlib/sync/sync_default.c.v | 61 +++++++++- vlib/sync/sync_macos.c.v | 11 +- vlib/sync/sync_windows.c.v | 17 ++- vlib/time/time_windows.c.v | 2 + vlib/v/tests/reliability/semaphore_wait.v | 142 ++++++++++++++++++++++ 5 files changed, 223 insertions(+), 10 deletions(-) create mode 100644 vlib/v/tests/reliability/semaphore_wait.v diff --git a/vlib/sync/sync_default.c.v b/vlib/sync/sync_default.c.v index 37d0c27226..76f0b44faa 100644 --- a/vlib/sync/sync_default.c.v +++ b/vlib/sync/sync_default.c.v @@ -125,18 +125,69 @@ pub fn (mut sem Semaphore) post() { } pub fn (mut sem Semaphore) wait() { - C.sem_wait(&sem.sem) + for { + if C.sem_wait(&sem.sem) == 0 { + return + } + e := C.errno + match e { + C.EINTR { + continue // interrupted by signal + } + else { + panic(unsafe { tos_clone(&byte(C.strerror(C.errno))) }) + } + } + } } +// `try_wait()` should return as fast as possible so error handling is only +// done when debugging pub fn (mut sem Semaphore) try_wait() bool { - return C.sem_trywait(&sem.sem) == 0 + $if !debug { + return C.sem_trywait(&sem.sem) == 0 + } $else { + if C.sem_trywait(&sem.sem) != 0 { + e := C.errno + match e { + C.EAGAIN { + return false + } + else { + panic(unsafe { tos_clone(&byte(C.strerror(C.errno))) }) + } + } + } + return true + } } pub fn (mut sem Semaphore) timed_wait(timeout time.Duration) bool { t_spec := timeout.timespec() - return C.sem_timedwait(&sem.sem, &t_spec) == 0 + for { + if C.sem_timedwait(&sem.sem, &t_spec) == 0 { + return true + } + e := C.errno + match e { + C.EINTR { + continue // interrupted by signal + } + C.ETIMEDOUT { + break + } + else { + panic(unsafe { tos_clone(&byte(C.strerror(e))) }) + } + } + } + return false } -pub fn (sem Semaphore) destroy() bool { - return C.sem_destroy(&sem.sem) == 0 +pub fn (sem Semaphore) destroy() { + res := C.sem_destroy(&sem.sem) + if res == 0 { + return + } + panic(unsafe { tos_clone(&byte(C.strerror(res))) }) } diff --git a/vlib/sync/sync_macos.c.v b/vlib/sync/sync_macos.c.v index 9ee55dfac3..3f83198e61 100644 --- a/vlib/sync/sync_macos.c.v +++ b/vlib/sync/sync_macos.c.v @@ -220,6 +220,13 @@ pub fn (mut sem Semaphore) timed_wait(timeout time.Duration) bool { return res == 0 } -pub fn (mut sem Semaphore) destroy() bool { - return C.pthread_cond_destroy(&sem.cond) == 0 && C.pthread_mutex_destroy(&sem.mtx) == 0 +pub fn (mut sem Semaphore) destroy() { + mut res := C.pthread_cond_destroy(&sem.cond) + if res == 0 { + res = C.pthread_mutex_destroy(&sem.mtx) + if res == 0 { + return + } + } + panic(unsafe { tos_clone(&byte(C.strerror(res))) }) } diff --git a/vlib/sync/sync_windows.c.v b/vlib/sync/sync_windows.c.v index 7995336477..b610baef4d 100644 --- a/vlib/sync/sync_windows.c.v +++ b/vlib/sync/sync_windows.c.v @@ -6,7 +6,9 @@ module sync import time #include +#include +fn C.GetSystemTimeAsFileTime(lpSystemTimeAsFileTime &C._FILETIME) fn C.InitializeConditionVariable(voidptr) fn C.WakeConditionVariable(voidptr) fn C.SleepConditionVariableSRW(voidptr, voidptr, u32, u32) int @@ -170,8 +172,12 @@ pub fn (mut sem Semaphore) timed_wait(timeout time.Duration) bool { return true } } + mut ft_start := C._FILETIME{} + C.GetSystemTimeAsFileTime(&ft_start) + time_end := ((u64(ft_start.dwHighDateTime) << 32) | ft_start.dwLowDateTime) + + u64(timeout / (100 * time.nanosecond)) + mut t_ms := u32(timeout / time.millisecond) C.AcquireSRWLockExclusive(&sem.mtx) - t_ms := u32(timeout / time.millisecond) mut res := 0 c = C.atomic_load_u32(&sem.count) @@ -191,11 +197,16 @@ pub fn (mut sem Semaphore) timed_wait(timeout time.Duration) bool { break outer } } + C.GetSystemTimeAsFileTime(&ft_start) + time_now := ((u64(ft_start.dwHighDateTime) << 32) | ft_start.dwLowDateTime) // in 100ns + if time_now > time_end { + break outer // timeout exceeded + } + t_ms = u32((time_end - time_now) / 10000) } C.ReleaseSRWLockExclusive(&sem.mtx) return res != 0 } -pub fn (s Semaphore) destroy() bool { - return true +pub fn (s Semaphore) destroy() { } diff --git a/vlib/time/time_windows.c.v b/vlib/time/time_windows.c.v index ee33a40581..eb8845eb44 100644 --- a/vlib/time/time_windows.c.v +++ b/vlib/time/time_windows.c.v @@ -16,6 +16,8 @@ struct C.tm { } struct C._FILETIME { + dwLowDateTime u32 + dwHighDateTime u32 } struct SystemTime { diff --git a/vlib/v/tests/reliability/semaphore_wait.v b/vlib/v/tests/reliability/semaphore_wait.v new file mode 100644 index 0000000000..f480fcaa7b --- /dev/null +++ b/vlib/v/tests/reliability/semaphore_wait.v @@ -0,0 +1,142 @@ +/* +This program can be used to test waiting for a semaphore +in the presence of signals that might interrupt the `wait()` call. + +In particular the effect of Boehm-GC can be investigated. + +To do so compile this program with `v -gc boehm semaphore_wait.v` +and run is as `./semaphore_wait > /dev/null` to test `sem.wait()` +or `./semaphore_wait -t > /dev/null` to test `sem.timedwait()` +on Windows: `.\semaphore_wait > NUL` or `.\semaphore_wait -t > NUL` + +On success (no interrupted `wait`) the output (from stderr) should like like: + +time: 524.228 result: `true` +time: 521.122 result: `true` +time: 523.714 result: `true` +time: 527.001 result: `true` +time: 521.696 result: `true` +... +Finished + +(The "result" is only printed with `-t`.) +*/ +import os +import time +import rand +import math +import sync + +struct DataObj { +mut: + data []f64 +} + +struct PtrObj { +mut: + nxt []&DataObj +} + +struct PtrPtrObj { +mut: + nxt []&PtrObj +} + +const ( + log2n = 9 + n = 1 << log2n + n4 = f64(u64(1) << (4 * log2n)) +) + +fn waste_mem() { + mut objs := PtrPtrObj{ + nxt: []&PtrObj{len: n} + } + for { + sz := rand.int_in_range(10, 1000) + mut new_obj := &PtrObj{ + nxt: []&DataObj{len: sz} + } + sz2 := rand.int_in_range(10, 500000) + new_obj2 := &DataObj{ + data: []f64{len: sz2} + } + idx2 := rand.int_in_range(0, sz) + new_obj.nxt[idx2] = new_obj2 + // non-equally distributed random index + idx := int(math.sqrt(math.sqrt(rand.f64n(n4)))) + objs.nxt[idx] = new_obj + } +} + +fn do_rec(mut sem sync.Semaphore, timed bool) { + mut start := time.sys_mono_now() + for { + r := if timed { + sem.timed_wait(600 * time.millisecond) + } else { + sem.wait() + false + } + end := time.sys_mono_now() + dur := f64(end - start) / f64(time.millisecond) + res_str := if timed { ' result: `$r`' } else { '' } + if dur < 450.0 || dur > 550.0 { + eprintln('Problem: time: ${dur:.3f}$res_str') + } else { + eprintln('time: ${dur:.3f}$res_str') + } + start = end + } +} + +fn do_send(mut sem sync.Semaphore) { + for { + time.sleep(500 * time.millisecond) + sem.post() + } +} + +fn usage() { + eprintln('usage:\n\t${os.args[0]} [-t] [num_iterations]') + exit(1) +} + +fn main() { + mut n_iterations := 5_000_000 + mut timed := false + if os.args.len > 3 { + usage() + } + for i in 1 .. os.args.len { + if os.args[i][0].is_digit() { + if i > 1 && !timed { + usage() + } + n_iterations = os.args[1].int() + } else if os.args[i] == '-t' { + timed = true + } else { + usage() + } + } + if os.args.len > 3 || n_iterations <= 0 { + eprintln('usage:\n\t${os.args[0]} [num_iterations]') + exit(1) + } + mut sem := sync.new_semaphore() + go do_rec(mut sem, timed) + go do_send(mut sem) + for _ in 0 .. 4 { + go waste_mem() + } + mut last := time.sys_mono_now() + for _ in 0 .. n_iterations { + now := time.sys_mono_now() + interval := now - last + println(f64(interval) / f64(time.millisecond)) + last = now + } + sem.destroy() + eprintln('Finished') +}