sync: make `Semaphore.*wait()` robust against interrupts by signals (#10491)
parent
a98d644637
commit
3f5aa5e634
|
@ -125,18 +125,69 @@ pub fn (mut sem Semaphore) post() {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn (mut sem Semaphore) wait() {
|
pub fn (mut sem Semaphore) wait() {
|
||||||
C.sem_wait(&sem.sem)
|
for {
|
||||||
|
if C.sem_wait(&sem.sem) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
e := C.errno
|
||||||
|
match e {
|
||||||
|
C.EINTR {
|
||||||
|
continue // interrupted by signal
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
panic(unsafe { tos_clone(&byte(C.strerror(C.errno))) })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// `try_wait()` should return as fast as possible so error handling is only
|
||||||
|
// done when debugging
|
||||||
pub fn (mut sem Semaphore) try_wait() bool {
|
pub fn (mut sem Semaphore) try_wait() bool {
|
||||||
|
$if !debug {
|
||||||
return C.sem_trywait(&sem.sem) == 0
|
return C.sem_trywait(&sem.sem) == 0
|
||||||
|
} $else {
|
||||||
|
if C.sem_trywait(&sem.sem) != 0 {
|
||||||
|
e := C.errno
|
||||||
|
match e {
|
||||||
|
C.EAGAIN {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
panic(unsafe { tos_clone(&byte(C.strerror(C.errno))) })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn (mut sem Semaphore) timed_wait(timeout time.Duration) bool {
|
pub fn (mut sem Semaphore) timed_wait(timeout time.Duration) bool {
|
||||||
t_spec := timeout.timespec()
|
t_spec := timeout.timespec()
|
||||||
return C.sem_timedwait(&sem.sem, &t_spec) == 0
|
for {
|
||||||
|
if C.sem_timedwait(&sem.sem, &t_spec) == 0 {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
e := C.errno
|
||||||
|
match e {
|
||||||
|
C.EINTR {
|
||||||
|
continue // interrupted by signal
|
||||||
|
}
|
||||||
|
C.ETIMEDOUT {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
else {
|
||||||
|
panic(unsafe { tos_clone(&byte(C.strerror(e))) })
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn (sem Semaphore) destroy() bool {
|
pub fn (sem Semaphore) destroy() {
|
||||||
return C.sem_destroy(&sem.sem) == 0
|
res := C.sem_destroy(&sem.sem)
|
||||||
|
if res == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
panic(unsafe { tos_clone(&byte(C.strerror(res))) })
|
||||||
}
|
}
|
||||||
|
|
|
@ -220,6 +220,13 @@ pub fn (mut sem Semaphore) timed_wait(timeout time.Duration) bool {
|
||||||
return res == 0
|
return res == 0
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn (mut sem Semaphore) destroy() bool {
|
pub fn (mut sem Semaphore) destroy() {
|
||||||
return C.pthread_cond_destroy(&sem.cond) == 0 && C.pthread_mutex_destroy(&sem.mtx) == 0
|
mut res := C.pthread_cond_destroy(&sem.cond)
|
||||||
|
if res == 0 {
|
||||||
|
res = C.pthread_mutex_destroy(&sem.mtx)
|
||||||
|
if res == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
panic(unsafe { tos_clone(&byte(C.strerror(res))) })
|
||||||
}
|
}
|
||||||
|
|
|
@ -6,7 +6,9 @@ module sync
|
||||||
import time
|
import time
|
||||||
|
|
||||||
#include <synchapi.h>
|
#include <synchapi.h>
|
||||||
|
#include <time.h>
|
||||||
|
|
||||||
|
fn C.GetSystemTimeAsFileTime(lpSystemTimeAsFileTime &C._FILETIME)
|
||||||
fn C.InitializeConditionVariable(voidptr)
|
fn C.InitializeConditionVariable(voidptr)
|
||||||
fn C.WakeConditionVariable(voidptr)
|
fn C.WakeConditionVariable(voidptr)
|
||||||
fn C.SleepConditionVariableSRW(voidptr, voidptr, u32, u32) int
|
fn C.SleepConditionVariableSRW(voidptr, voidptr, u32, u32) int
|
||||||
|
@ -170,8 +172,12 @@ pub fn (mut sem Semaphore) timed_wait(timeout time.Duration) bool {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
mut ft_start := C._FILETIME{}
|
||||||
|
C.GetSystemTimeAsFileTime(&ft_start)
|
||||||
|
time_end := ((u64(ft_start.dwHighDateTime) << 32) | ft_start.dwLowDateTime) +
|
||||||
|
u64(timeout / (100 * time.nanosecond))
|
||||||
|
mut t_ms := u32(timeout / time.millisecond)
|
||||||
C.AcquireSRWLockExclusive(&sem.mtx)
|
C.AcquireSRWLockExclusive(&sem.mtx)
|
||||||
t_ms := u32(timeout / time.millisecond)
|
|
||||||
mut res := 0
|
mut res := 0
|
||||||
c = C.atomic_load_u32(&sem.count)
|
c = C.atomic_load_u32(&sem.count)
|
||||||
|
|
||||||
|
@ -191,11 +197,16 @@ pub fn (mut sem Semaphore) timed_wait(timeout time.Duration) bool {
|
||||||
break outer
|
break outer
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
C.GetSystemTimeAsFileTime(&ft_start)
|
||||||
|
time_now := ((u64(ft_start.dwHighDateTime) << 32) | ft_start.dwLowDateTime) // in 100ns
|
||||||
|
if time_now > time_end {
|
||||||
|
break outer // timeout exceeded
|
||||||
|
}
|
||||||
|
t_ms = u32((time_end - time_now) / 10000)
|
||||||
}
|
}
|
||||||
C.ReleaseSRWLockExclusive(&sem.mtx)
|
C.ReleaseSRWLockExclusive(&sem.mtx)
|
||||||
return res != 0
|
return res != 0
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn (s Semaphore) destroy() bool {
|
pub fn (s Semaphore) destroy() {
|
||||||
return true
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -16,6 +16,8 @@ struct C.tm {
|
||||||
}
|
}
|
||||||
|
|
||||||
struct C._FILETIME {
|
struct C._FILETIME {
|
||||||
|
dwLowDateTime u32
|
||||||
|
dwHighDateTime u32
|
||||||
}
|
}
|
||||||
|
|
||||||
struct SystemTime {
|
struct SystemTime {
|
||||||
|
|
|
@ -0,0 +1,142 @@
|
||||||
|
/*
|
||||||
|
This program can be used to test waiting for a semaphore
|
||||||
|
in the presence of signals that might interrupt the `wait()` call.
|
||||||
|
|
||||||
|
In particular the effect of Boehm-GC can be investigated.
|
||||||
|
|
||||||
|
To do so compile this program with `v -gc boehm semaphore_wait.v`
|
||||||
|
and run is as `./semaphore_wait > /dev/null` to test `sem.wait()`
|
||||||
|
or `./semaphore_wait -t > /dev/null` to test `sem.timedwait()`
|
||||||
|
on Windows: `.\semaphore_wait > NUL` or `.\semaphore_wait -t > NUL`
|
||||||
|
|
||||||
|
On success (no interrupted `wait`) the output (from stderr) should like like:
|
||||||
|
|
||||||
|
time: 524.228 result: `true`
|
||||||
|
time: 521.122 result: `true`
|
||||||
|
time: 523.714 result: `true`
|
||||||
|
time: 527.001 result: `true`
|
||||||
|
time: 521.696 result: `true`
|
||||||
|
...
|
||||||
|
Finished
|
||||||
|
|
||||||
|
(The "result" is only printed with `-t`.)
|
||||||
|
*/
|
||||||
|
import os
|
||||||
|
import time
|
||||||
|
import rand
|
||||||
|
import math
|
||||||
|
import sync
|
||||||
|
|
||||||
|
struct DataObj {
|
||||||
|
mut:
|
||||||
|
data []f64
|
||||||
|
}
|
||||||
|
|
||||||
|
struct PtrObj {
|
||||||
|
mut:
|
||||||
|
nxt []&DataObj
|
||||||
|
}
|
||||||
|
|
||||||
|
struct PtrPtrObj {
|
||||||
|
mut:
|
||||||
|
nxt []&PtrObj
|
||||||
|
}
|
||||||
|
|
||||||
|
const (
|
||||||
|
log2n = 9
|
||||||
|
n = 1 << log2n
|
||||||
|
n4 = f64(u64(1) << (4 * log2n))
|
||||||
|
)
|
||||||
|
|
||||||
|
fn waste_mem() {
|
||||||
|
mut objs := PtrPtrObj{
|
||||||
|
nxt: []&PtrObj{len: n}
|
||||||
|
}
|
||||||
|
for {
|
||||||
|
sz := rand.int_in_range(10, 1000)
|
||||||
|
mut new_obj := &PtrObj{
|
||||||
|
nxt: []&DataObj{len: sz}
|
||||||
|
}
|
||||||
|
sz2 := rand.int_in_range(10, 500000)
|
||||||
|
new_obj2 := &DataObj{
|
||||||
|
data: []f64{len: sz2}
|
||||||
|
}
|
||||||
|
idx2 := rand.int_in_range(0, sz)
|
||||||
|
new_obj.nxt[idx2] = new_obj2
|
||||||
|
// non-equally distributed random index
|
||||||
|
idx := int(math.sqrt(math.sqrt(rand.f64n(n4))))
|
||||||
|
objs.nxt[idx] = new_obj
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn do_rec(mut sem sync.Semaphore, timed bool) {
|
||||||
|
mut start := time.sys_mono_now()
|
||||||
|
for {
|
||||||
|
r := if timed {
|
||||||
|
sem.timed_wait(600 * time.millisecond)
|
||||||
|
} else {
|
||||||
|
sem.wait()
|
||||||
|
false
|
||||||
|
}
|
||||||
|
end := time.sys_mono_now()
|
||||||
|
dur := f64(end - start) / f64(time.millisecond)
|
||||||
|
res_str := if timed { ' result: `$r`' } else { '' }
|
||||||
|
if dur < 450.0 || dur > 550.0 {
|
||||||
|
eprintln('Problem: time: ${dur:.3f}$res_str')
|
||||||
|
} else {
|
||||||
|
eprintln('time: ${dur:.3f}$res_str')
|
||||||
|
}
|
||||||
|
start = end
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn do_send(mut sem sync.Semaphore) {
|
||||||
|
for {
|
||||||
|
time.sleep(500 * time.millisecond)
|
||||||
|
sem.post()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn usage() {
|
||||||
|
eprintln('usage:\n\t${os.args[0]} [-t] [num_iterations]')
|
||||||
|
exit(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn main() {
|
||||||
|
mut n_iterations := 5_000_000
|
||||||
|
mut timed := false
|
||||||
|
if os.args.len > 3 {
|
||||||
|
usage()
|
||||||
|
}
|
||||||
|
for i in 1 .. os.args.len {
|
||||||
|
if os.args[i][0].is_digit() {
|
||||||
|
if i > 1 && !timed {
|
||||||
|
usage()
|
||||||
|
}
|
||||||
|
n_iterations = os.args[1].int()
|
||||||
|
} else if os.args[i] == '-t' {
|
||||||
|
timed = true
|
||||||
|
} else {
|
||||||
|
usage()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if os.args.len > 3 || n_iterations <= 0 {
|
||||||
|
eprintln('usage:\n\t${os.args[0]} [num_iterations]')
|
||||||
|
exit(1)
|
||||||
|
}
|
||||||
|
mut sem := sync.new_semaphore()
|
||||||
|
go do_rec(mut sem, timed)
|
||||||
|
go do_send(mut sem)
|
||||||
|
for _ in 0 .. 4 {
|
||||||
|
go waste_mem()
|
||||||
|
}
|
||||||
|
mut last := time.sys_mono_now()
|
||||||
|
for _ in 0 .. n_iterations {
|
||||||
|
now := time.sys_mono_now()
|
||||||
|
interval := now - last
|
||||||
|
println(f64(interval) / f64(time.millisecond))
|
||||||
|
last = now
|
||||||
|
}
|
||||||
|
sem.destroy()
|
||||||
|
eprintln('Finished')
|
||||||
|
}
|
Loading…
Reference in New Issue