sync: add semaphores (#5831)

pull/5836/head
Uwe Krüger 2020-07-15 10:22:33 +02:00 committed by GitHub
parent 6a260ad974
commit 8df6e59678
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 253 additions and 16 deletions

View File

@ -390,6 +390,9 @@ fn C.ReleaseMutex(voidptr) bool
fn C.CreateEvent(int, bool, bool, byteptr) voidptr fn C.CreateEvent(int, bool, bool, byteptr) voidptr
fn C.SetEvent(voidptr) int fn C.SetEvent(voidptr) int
fn C.CreateSemaphore(voidptr, int, int, voidptr) voidptr
fn C.ReleaseSemaphore(voidptr, int, voidptr) voidptr
fn C.InitializeSRWLock(voidptr) fn C.InitializeSRWLock(voidptr)
fn C.AcquireSRWLockShared(voidptr) fn C.AcquireSRWLockShared(voidptr)
fn C.AcquireSRWLockExclusive(voidptr) fn C.AcquireSRWLockExclusive(voidptr)
@ -409,6 +412,19 @@ fn C.pthread_rwlock_rdlock(voidptr) int
fn C.pthread_rwlock_wrlock(voidptr) int fn C.pthread_rwlock_wrlock(voidptr) int
fn C.pthread_rwlock_unlock(voidptr) int fn C.pthread_rwlock_unlock(voidptr) int
fn C.pthread_condattr_init(voidptr) int
fn C.pthread_condattr_setpshared(voidptr, int) int
fn C.pthread_cond_init(voidptr, voidptr) int
fn C.pthread_cond_signal(voidptr) int
fn C.pthread_cond_wait(voidptr, voidptr) int
fn C.pthread_cond_timedwait(voidptr, voidptr, voidptr) int
fn C.sem_init(voidptr, int, u32) int
fn C.sem_post(voidptr) int
fn C.sem_wait(voidptr) int
fn C.sem_trywait(voidptr) int
fn C.sem_timedwait(voidptr, voidptr) int
fn C.read(fd int, buf voidptr, count size_t) int fn C.read(fd int, buf voidptr, count size_t) int
fn C.write(fd int, buf voidptr, count size_t) int fn C.write(fd int, buf voidptr, count size_t) int
fn C.close(fd int) int fn C.close(fd int) int

View File

@ -3,7 +3,10 @@
// that can be found in the LICENSE file. // that can be found in the LICENSE file.
module sync module sync
import time
#flag -lpthread #flag -lpthread
#include <semaphore.h>
// [init_with=new_mutex] // TODO: implement support for this struct attribute, and disallow Mutex{} from outside the sync.new_mutex() function. // [init_with=new_mutex] // TODO: implement support for this struct attribute, and disallow Mutex{} from outside the sync.new_mutex() function.
[ref_only] [ref_only]
@ -21,6 +24,36 @@ struct RwMutexAttr {
attr C.pthread_rwlockattr_t attr C.pthread_rwlockattr_t
} }
/* MacOSX has no unnamed semaphores and no `timed_wait()` at all
so we emulate the behaviour with other devices */
struct MacOSX_Semaphore {
mtx C.pthread_mutex_t
cond C.pthread_cond_t
mut:
count int
}
[ref_only]
struct PosixSemaphore {
sem C.sem_t
}
[ref_only]
struct CondAttr {
attr C.pthread_condattr_t
}
pub struct Semaphore {
/*
$if macos {
sem &MacOSX_Semaphore
} $else {
sem &PosixSemaphore
}
*/
sem voidptr // since the above does not work, yet
}
pub fn new_mutex() &Mutex { pub fn new_mutex() &Mutex {
m := &Mutex{} m := &Mutex{}
C.pthread_mutex_init(&m.mutex, C.NULL) C.pthread_mutex_init(&m.mutex, C.NULL)
@ -65,3 +98,91 @@ pub fn (mut m RwMutex) r_unlock() {
pub fn (mut m RwMutex) w_unlock() { pub fn (mut m RwMutex) w_unlock() {
C.pthread_rwlock_unlock(&m.mutex) C.pthread_rwlock_unlock(&m.mutex)
} }
pub fn new_semaphore() Semaphore {
$if macos {
s := Semaphore{
sem: &MacOSX_Semaphore{count: 0}
}
C.pthread_mutex_init(&&MacOSX_Semaphore(s.sem).mtx, C.NULL)
a := &CondAttr{}
C.pthread_condattr_init(&a.attr)
C.pthread_condattr_setpshared(&a.attr, C.PTHREAD_PROCESS_PRIVATE)
C.pthread_cond_init(&&MacOSX_Semaphore(s.sem).cond, &a.attr)
return s
} $else {
s := Semaphore{
sem: &PosixSemaphore{}
}
C.sem_init(&&PosixSemaphore(s.sem).sem, 0, 0)
return s
}
}
pub fn (s Semaphore) post() {
$if macos {
C.pthread_mutex_lock(&&MacOSX_Semaphore(s.sem).mtx)
(&MacOSX_Semaphore(s.sem)).count++
C.pthread_cond_signal(&&MacOSX_Semaphore(s.sem).cond)
C.pthread_mutex_unlock(&&MacOSX_Semaphore(s.sem).mtx)
} $else {
C.sem_post(&&PosixSemaphore(s.sem).sem)
}
}
pub fn (s Semaphore) wait() {
$if macos {
C.pthread_mutex_lock(&&MacOSX_Semaphore(s.sem).mtx)
for &MacOSX_Semaphore(s.sem).count == 0 {
C.pthread_cond_wait(&&MacOSX_Semaphore(s.sem).cond, &&MacOSX_Semaphore(s.sem).mtx)
}
(&MacOSX_Semaphore(s.sem)).count--
C.pthread_mutex_unlock(&&MacOSX_Semaphore(s.sem).mtx)
} $else {
C.sem_wait(&&PosixSemaphore(s.sem).sem)
}
}
pub fn (s Semaphore) try_wait() bool {
$if macos {
t_spec := time.zero_timespec()
C.pthread_mutex_lock(&&MacOSX_Semaphore(s.sem).mtx)
for &MacOSX_Semaphore(s.sem).count == 0 {
res := C.pthread_cond_timedwait(&&MacOSX_Semaphore(s.sem).cond, &&MacOSX_Semaphore(s.sem).mtx, &t_spec)
if res == C.ETIMEDOUT {
break
}
}
mut res := false
if &MacOSX_Semaphore(s.sem).count > 0 { // success
(&MacOSX_Semaphore(s.sem)).count--
res = true
}
C.pthread_mutex_unlock(&&MacOSX_Semaphore(s.sem).mtx)
return res
} $else {
return C.sem_trywait(&&PosixSemaphore(s.sem).sem) == 0
}
}
pub fn (s Semaphore) timed_wait(timeout time.Duration) bool {
t_spec := timeout.timespec()
$if macos {
C.pthread_mutex_lock(&&MacOSX_Semaphore(s.sem).mtx)
for &MacOSX_Semaphore(s.sem).count == 0 {
res := C.pthread_cond_timedwait(&&MacOSX_Semaphore(s.sem).cond, &&MacOSX_Semaphore(s.sem).mtx, &t_spec)
if res == C.ETIMEDOUT {
break
}
}
mut res := false
if &MacOSX_Semaphore(s.sem).count > 0 { // success
(&MacOSX_Semaphore(s.sem)).count--
res = true
}
C.pthread_mutex_unlock(&&MacOSX_Semaphore(s.sem).mtx)
return res
} $else {
return C.sem_timedwait(&&PosixSemaphore(s.sem).sem, &t_spec) == 0
}
}

View File

@ -3,11 +3,15 @@
// that can be found in the LICENSE file. // that can be found in the LICENSE file.
module sync module sync
import time
// TODO: The suggestion of using CriticalSection instead of mutex // TODO: The suggestion of using CriticalSection instead of mutex
// was discussed. Needs consideration. // was discussed. Needs consideration.
// Mutex HANDLE // Mutex HANDLE
type MHANDLE voidptr type MHANDLE voidptr
// Semaphore HANDLE
type SHANDLE voidptr
//[init_with=new_mutex] // TODO: implement support for this struct attribute, and disallow Mutex{} from outside the sync.new_mutex() function. //[init_with=new_mutex] // TODO: implement support for this struct attribute, and disallow Mutex{} from outside the sync.new_mutex() function.
@ -28,6 +32,11 @@ mut:
mx C.SRWLOCK // mutex handle mx C.SRWLOCK // mutex handle
} }
pub struct Semaphore {
mut:
sem SHANDLE
}
enum MutexState { enum MutexState {
broken broken
waiting waiting
@ -118,3 +127,25 @@ pub fn (mut m Mutex) destroy() {
C.CloseHandle(m.mx) // destroy mutex C.CloseHandle(m.mx) // destroy mutex
m.state = .destroyed // setting up reference to invalid state m.state = .destroyed // setting up reference to invalid state
} }
pub fn new_semaphore() Semaphore {
return Semaphore{
sem: SHANDLE(C.CreateSemaphore(0, 0, C.INT32_MAX, 0))
}
}
pub fn (s Semaphore) post() {
C.ReleaseSemaphore(s.sem, 1, 0)
}
pub fn (s Semaphore) wait() {
C.WaitForSingleObject(s.sem, C.INFINITE)
}
pub fn (s Semaphore) try_wait() bool {
return C.WaitForSingleObject(s.sem, 0) == 0
}
pub fn (s Semaphore) timed_wait(timeout time.Duration) bool {
return C.WaitForSingleObject(s.sem, timeout / time.millisecond) == 0
}

View File

@ -35,6 +35,7 @@ type time_t voidptr
// in most systems, these are __quad_t, which is an i64 // in most systems, these are __quad_t, which is an i64
struct C.timespec { struct C.timespec {
mut:
tv_sec i64 tv_sec i64
tv_nsec i64 tv_nsec i64
} }
@ -78,3 +79,27 @@ pub struct C.timeval {
tv_sec u64 tv_sec u64
tv_usec u64 tv_usec u64
} }
// return absolute timespec for now()+d
pub fn (d Duration) timespec() C.timespec {
mut ts := C.timespec{}
C.clock_gettime(C.CLOCK_REALTIME, &ts)
d_sec := d / second
d_nsec := d % second
ts.tv_sec += d_sec
ts.tv_nsec += d_nsec
if ts.tv_nsec > second {
ts.tv_nsec -= second
ts.tv_sec++
}
return ts
}
// return timespec of 1970/1/1
pub fn zero_timespec() C.timespec {
ts := C.timespec{
tv_sec: 0
tv_nsec: 0
}
return ts
}

View File

@ -1,35 +1,27 @@
import sync import sync
import time
const ( const (
iterations_per_thread2 = 100000 iterations_per_thread2 = 100000
) )
fn inc_elements(shared foo []int, n int) { fn inc_elements(shared foo []int, n int, sem sync.Semaphore) {
for _ in 0 .. iterations_per_thread2 { for _ in 0 .. iterations_per_thread2 {
foo[n]++ foo[n]++
} }
foo[0]++ // indicat that thread is finished sem.post() // indicat that thread is finished
} }
fn test_autolocked_array_2() { fn test_autolocked_array_2() {
shared abc := &[0, 0, 0] shared abc := &[0, 0, 0]
go inc_elements(shared abc, 1) sem := sync.new_semaphore()
go inc_elements(shared abc, 2) go inc_elements(shared abc, 1, sem)
go inc_elements(shared abc, 2, sem)
for _ in 0 .. iterations_per_thread2 { for _ in 0 .. iterations_per_thread2 {
abc[2]++ abc[2]++
} }
// wait for coroutines to finish - that should really be // wait for the 2 coroutines to finish using the semaphore
// done by channels, yield, semaphore... for _ in 0 .. 2 {
for { sem.wait()
mut finished_threads := 0
rlock abc {
finished_threads = abc[0]
}
if finished_threads == 2 {
break
}
time.sleep_ms(100)
} }
rlock abc { rlock abc {
assert abc[1] == iterations_per_thread2 assert abc[1] == iterations_per_thread2

View File

@ -0,0 +1,25 @@
import sync
const (
signals_per_thread = 10000000
)
fn send_signals(sem, sem_end sync.Semaphore) {
for _ in 0 .. signals_per_thread {
sem.post()
}
sem_end.post()
}
fn test_semaphores() {
sem := sync.new_semaphore()
sem_end := sync.new_semaphore()
go send_signals(sem, sem_end)
go send_signals(sem, sem_end)
for _ in 0 .. 2 * signals_per_thread {
sem.wait()
}
sem_end.wait()
sem_end.wait()
assert true
}

View File

@ -0,0 +1,27 @@
import sync
import time
fn run_forever(shared foo []int, sem sync.Semaphore) {
for {
foo[0]++
}
sem.post() // indicat that thread is finished - never happens
}
fn test_semaphore() {
shared abc := &[0]
sem := sync.new_semaphore()
go run_forever(shared abc, sem)
for _ in 0 .. 100000 {
abc[0]--
}
// wait for the 2 coroutines to finish using the semaphore
stopwatch := time.new_stopwatch({})
mut elapsed := stopwatch.elapsed()
if !sem.timed_wait(500 * time.millisecond) {
// we should come here due to timeout
elapsed = stopwatch.elapsed()
}
println('elapsed: ${f64(elapsed)/time.second}s')
assert elapsed >= 495 * time.millisecond
}