diff --git a/vlib/sync/channel_fill_test.v b/vlib/sync/channel_fill_test.v index cc68749810..2f02153b23 100644 --- a/vlib/sync/channel_fill_test.v +++ b/vlib/sync/channel_fill_test.v @@ -5,7 +5,7 @@ const ( queue_fill = 763 ) -fn do_send(ch chan int, fin sync.Semaphore) { +fn do_send(ch chan int, mut fin sync.Semaphore) { for i in 0 .. queue_fill { ch <- i } @@ -14,8 +14,8 @@ fn do_send(ch chan int, fin sync.Semaphore) { fn test_channel_len_cap() { ch := chan int{cap: queue_len} - sem := sync.new_semaphore() - go do_send(ch, sem) + mut sem := sync.new_semaphore() + go do_send(ch, mut sem) sem.wait() assert ch.cap == queue_len assert ch.len == queue_fill diff --git a/vlib/sync/channel_opt_propagate_test.v b/vlib/sync/channel_opt_propagate_test.v index 4bb4138aee..f9aea58d60 100644 --- a/vlib/sync/channel_opt_propagate_test.v +++ b/vlib/sync/channel_opt_propagate_test.v @@ -10,7 +10,7 @@ fn get_val_from_chan(ch chan i64) ?i64 { } // this function gets an array of channels for `i64` -fn do_rec_calc_send(chs []chan i64, sem sync.Semaphore) { +fn do_rec_calc_send(chs []chan i64, mut sem sync.Semaphore) { mut msg := '' for { mut s := get_val_from_chan(chs[0]) or { @@ -26,8 +26,8 @@ fn do_rec_calc_send(chs []chan i64, sem sync.Semaphore) { fn test_channel_array_mut() { mut chs := [chan i64{}, chan i64{cap: 10}] - sem := sync.new_semaphore() - go do_rec_calc_send(chs, sem) + mut sem := sync.new_semaphore() + go do_rec_calc_send(chs, mut sem) mut t := i64(100) for _ in 0 .. num_iterations { chs[0] <- t diff --git a/vlib/sync/channel_select_3_test.v b/vlib/sync/channel_select_3_test.v index c78dca5d60..26dda99aca 100644 --- a/vlib/sync/channel_select_3_test.v +++ b/vlib/sync/channel_select_3_test.v @@ -9,7 +9,7 @@ fn getint() int { return 8 } -fn f1(ch1 chan int, ch2 chan St, ch3 chan int, ch4 chan int, ch5 chan int, sem sync.Semaphore) { +fn f1(ch1 chan int, ch2 chan St, ch3 chan int, ch4 chan int, ch5 chan int, mut sem sync.Semaphore) { mut a := 5 select { a = <-ch3 { @@ -40,7 +40,7 @@ fn f1(ch1 chan int, ch2 chan St, ch3 chan int, ch4 chan int, ch5 chan int, sem s sem.post() } -fn f2(ch1 chan St, ch2 chan int, sem sync.Semaphore) { +fn f2(ch1 chan St, ch2 chan int, mut sem sync.Semaphore) { mut r := 23 for i in 0 .. 2 { select { @@ -66,7 +66,7 @@ fn test_select_blocks() { ch3 := chan int{} ch4 := chan int{} ch5 := chan int{} - sem := sync.new_semaphore() + mut sem := sync.new_semaphore() mut r := false t := select { b := <-ch1 { @@ -79,7 +79,7 @@ fn test_select_blocks() { } assert r == true assert t == true - go f2(ch2, ch3, sem) + go f2(ch2, ch3, mut sem) n := <-ch3 assert n == 23 ch2 <- St{ @@ -87,7 +87,7 @@ fn test_select_blocks() { } sem.wait() stopwatch := time.new_stopwatch({}) - go f1(ch1, ch2, ch3, ch4, ch5, sem) + go f1(ch1, ch2, ch3, ch4, ch5, mut sem) sem.wait() elapsed_ms := f64(stopwatch.elapsed()) / time.millisecond // https://docs.microsoft.com/en-us/windows-hardware/drivers/kernel/high-resolution-timers diff --git a/vlib/sync/channels.v b/vlib/sync/channels.v index a6e4291556..a038fff1ec 100644 --- a/vlib/sync/channels.v +++ b/vlib/sync/channels.v @@ -66,7 +66,7 @@ enum BufferElemStat { struct Subscription { mut: - sem Semaphore + sem &Semaphore prev &&Subscription nxt &Subscription } @@ -77,14 +77,14 @@ enum Direction { } struct Channel { - writesem Semaphore // to wake thread that wanted to write, but buffer was full - readsem Semaphore // to wake thread that wanted to read, but buffer was empty - writesem_im Semaphore - readsem_im Semaphore ringbuf byteptr // queue for buffered channels statusbuf byteptr // flags to synchronize write/read in ringbuf objsize u32 mut: // atomic + writesem Semaphore // to wake thread that wanted to write, but buffer was full + readsem Semaphore // to wake thread that wanted to read, but buffer was empty + writesem_im Semaphore + readsem_im Semaphore write_adr C.atomic_uintptr_t // if != NULL the next obj can be written here without wait read_adr C.atomic_uintptr_t // if != NULL an obj can be read from here without wait adr_read C.atomic_uintptr_t // used to identify origin of writesem @@ -113,11 +113,7 @@ fn new_channel_st(n u32, st u32) &Channel { rsem := if n > 0 { u32(0) } else { 1 } rbuf := if n > 0 { malloc(int(n * st)) } else { byteptr(0) } sbuf := if n > 0 { vcalloc(int(n * 2)) } else { byteptr(0) } - return &Channel{ - writesem: new_semaphore_init(wsem) - readsem: new_semaphore_init(rsem) - writesem_im: new_semaphore() - readsem_im: new_semaphore() + mut ch := &Channel{ objsize: st cap: n write_free: n @@ -127,6 +123,11 @@ fn new_channel_st(n u32, st u32) &Channel { write_subscriber: 0 read_subscriber: 0 } + ch.writesem.init(wsem) + ch.readsem.init(rsem) + ch.writesem_im.init(0) + ch.readsem_im.init(0) + return ch } pub fn (mut ch Channel) close() { @@ -528,9 +529,10 @@ pub fn channel_select(mut channels []&Channel, dir []Direction, mut objrefs []vo assert channels.len == dir.len assert dir.len == objrefs.len mut subscr := []Subscription{len: channels.len} - sem := new_semaphore() + mut sem := Semaphore{} + sem.init(0) for i, ch in channels { - subscr[i].sem = sem + subscr[i].sem = &sem if dir[i] == .push { mut null16 := u16(0) for !C.atomic_compare_exchange_weak_u16(&ch.write_sub_mtx, &null16, u16(1)) { diff --git a/vlib/sync/sync_default.c.v b/vlib/sync/sync_default.c.v new file mode 100644 index 0000000000..744fb15e15 --- /dev/null +++ b/vlib/sync/sync_default.c.v @@ -0,0 +1,134 @@ +// Copyright (c) 2019-2021 Alexander Medvednikov. All rights reserved. +// Use of this source code is governed by an MIT license +// that can be found in the LICENSE file. +module sync + +import time + +#flag -lpthread +#include + +[trusted] +fn C.pthread_mutex_init(voidptr, voidptr) int +fn C.pthread_mutex_lock(voidptr) int +fn C.pthread_mutex_unlock(voidptr) int +fn C.pthread_mutex_destroy(voidptr) int +fn C.pthread_rwlockattr_init(voidptr) int +fn C.pthread_rwlockattr_setkind_np(voidptr, int) int +fn C.pthread_rwlockattr_setpshared(voidptr, int) int +fn C.pthread_rwlock_init(voidptr, voidptr) int +fn C.pthread_rwlock_rdlock(voidptr) int +fn C.pthread_rwlock_wrlock(voidptr) int +fn C.pthread_rwlock_unlock(voidptr) int +fn C.sem_init(voidptr, int, u32) int +fn C.sem_post(voidptr) int +fn C.sem_wait(voidptr) int +fn C.sem_trywait(voidptr) int +fn C.sem_timedwait(voidptr, voidptr) int +fn C.sem_destroy(voidptr) int + +// [init_with=new_mutex] // TODO: implement support for this struct attribute, and disallow Mutex{} from outside the sync.new_mutex() function. +pub struct Mutex { + mutex C.pthread_mutex_t +} + +pub struct RwMutex { + mutex C.pthread_rwlock_t +} + +struct RwMutexAttr { + attr C.pthread_rwlockattr_t +} + +struct Semaphore { + sem C.sem_t +} + +pub fn new_mutex() &Mutex { + mut m := &Mutex{} + m.init() + return m +} + +pub fn (mut m Mutex) init() { + C.pthread_mutex_init(&m.mutex, C.NULL) +} + +pub fn new_rwmutex() &RwMutex { + mut m := &RwMutex{} + m.init() + return m +} + +pub fn (mut m RwMutex) init() { + a := RwMutexAttr{} + C.pthread_rwlockattr_init(&a.attr) + // Give writer priority over readers + C.pthread_rwlockattr_setkind_np(&a.attr, C.PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP) + C.pthread_rwlockattr_setpshared(&a.attr, C.PTHREAD_PROCESS_PRIVATE) + C.pthread_rwlock_init(&m.mutex, &a.attr) +} + +// m_lock(), for *manual* mutex handling, since `lock` is a keyword +pub fn (mut m Mutex) m_lock() { + C.pthread_mutex_lock(&m.mutex) +} + +pub fn (mut m Mutex) unlock() { + C.pthread_mutex_unlock(&m.mutex) +} + +// RwMutex has separate read- and write locks +pub fn (mut m RwMutex) r_lock() { + C.pthread_rwlock_rdlock(&m.mutex) +} + +pub fn (mut m RwMutex) w_lock() { + C.pthread_rwlock_wrlock(&m.mutex) +} + +// Windows SRWLocks have different function to unlock +// So provide two functions here, too, to have a common interface +pub fn (mut m RwMutex) r_unlock() { + C.pthread_rwlock_unlock(&m.mutex) +} + +pub fn (mut m RwMutex) w_unlock() { + C.pthread_rwlock_unlock(&m.mutex) +} + +[inline] +pub fn new_semaphore() &Semaphore { + return new_semaphore_init(0) +} + +pub fn new_semaphore_init(n u32) &Semaphore { + mut sem := &Semaphore{} + sem.init(n) + return sem +} + +pub fn (mut sem Semaphore) init(n u32) { + C.sem_init(&sem.sem, 0, n) +} + +pub fn (mut sem Semaphore) post() { + C.sem_post(&sem.sem) +} + +pub fn (mut sem Semaphore) wait() { + C.sem_wait(&sem.sem) +} + +pub fn (mut sem Semaphore) try_wait() bool { + return C.sem_trywait(&sem.sem) == 0 +} + +pub fn (mut sem Semaphore) timed_wait(timeout time.Duration) bool { + t_spec := timeout.timespec() + return C.sem_timedwait(&sem.sem, &t_spec) == 0 +} + +pub fn (sem Semaphore) destroy() bool { + return C.sem_destroy(&sem.sem) == 0 +} diff --git a/vlib/sync/sync_macos.c.v b/vlib/sync/sync_macos.c.v new file mode 100644 index 0000000000..c92d389087 --- /dev/null +++ b/vlib/sync/sync_macos.c.v @@ -0,0 +1,212 @@ +// Copyright (c) 2019-2021 Alexander Medvednikov. All rights reserved. +// Use of this source code is governed by an MIT license +// that can be found in the LICENSE file. +module sync + +import time + +#flag -lpthread +#include + +[trusted] +fn C.pthread_mutex_init(voidptr, voidptr) int +fn C.pthread_mutex_lock(voidptr) int +fn C.pthread_mutex_unlock(voidptr) int +fn C.pthread_mutex_destroy(voidptr) int +fn C.pthread_rwlockattr_init(voidptr) int +fn C.pthread_rwlockattr_setkind_np(voidptr, int) int +fn C.pthread_rwlockattr_setpshared(voidptr, int) int +fn C.pthread_rwlock_init(voidptr, voidptr) int +fn C.pthread_rwlock_rdlock(voidptr) int +fn C.pthread_rwlock_wrlock(voidptr) int +fn C.pthread_rwlock_unlock(voidptr) int +fn C.pthread_condattr_init(voidptr) int +fn C.pthread_condattr_setpshared(voidptr, int) int +fn C.pthread_condattr_destroy(voidptr) int +fn C.pthread_cond_init(voidptr, voidptr) int +fn C.pthread_cond_signal(voidptr) int +fn C.pthread_cond_wait(voidptr, voidptr) int +fn C.pthread_cond_timedwait(voidptr, voidptr, voidptr) int +fn C.pthread_cond_destroy(voidptr) int + +// [init_with=new_mutex] // TODO: implement support for this struct attribute, and disallow Mutex{} from outside the sync.new_mutex() function. +pub struct Mutex { + mutex C.pthread_mutex_t +} + +pub struct RwMutex { + mutex C.pthread_rwlock_t +} + +struct RwMutexAttr { + attr C.pthread_rwlockattr_t +} + +struct CondAttr { + attr C.pthread_condattr_t +} + +/* MacOSX has no unnamed semaphores and no `timed_wait()` at all + so we emulate the behaviour with other devices */ +struct Semaphore { + mtx C.pthread_mutex_t + cond C.pthread_cond_t +mut: + count u32 +} + +pub fn new_mutex() &Mutex { + mut m := &Mutex{} + m.init() + return m +} + +pub fn (mut m Mutex) init() { + C.pthread_mutex_init(&m.mutex, C.NULL) +} + +pub fn new_rwmutex() &RwMutex { + mut m := &RwMutex{} + m.init() + return m +} + +pub fn (mut m RwMutex) init() { + a := RwMutexAttr{} + C.pthread_rwlockattr_init(&a.attr) + // Give writer priority over readers + C.pthread_rwlockattr_setkind_np(&a.attr, C.PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP) + C.pthread_rwlockattr_setpshared(&a.attr, C.PTHREAD_PROCESS_PRIVATE) + C.pthread_rwlock_init(&m.mutex, &a.attr) +} + +// m_lock(), for *manual* mutex handling, since `lock` is a keyword +pub fn (mut m Mutex) m_lock() { + C.pthread_mutex_lock(&m.mutex) +} + +pub fn (mut m Mutex) unlock() { + C.pthread_mutex_unlock(&m.mutex) +} + +// RwMutex has separate read- and write locks +pub fn (mut m RwMutex) r_lock() { + C.pthread_rwlock_rdlock(&m.mutex) +} + +pub fn (mut m RwMutex) w_lock() { + C.pthread_rwlock_wrlock(&m.mutex) +} + +// Windows SRWLocks have different function to unlock +// So provide two functions here, too, to have a common interface +pub fn (mut m RwMutex) r_unlock() { + C.pthread_rwlock_unlock(&m.mutex) +} + +pub fn (mut m RwMutex) w_unlock() { + C.pthread_rwlock_unlock(&m.mutex) +} + +[inline] +pub fn new_semaphore() &Semaphore { + return new_semaphore_init(0) +} + +pub fn new_semaphore_init(n u32) &Semaphore { + mut sem := &Semaphore{} + sem.init(n) + return sem +} + +pub fn (mut sem Semaphore) init(n u32) { + C.atomic_store_u32(&sem.count, n) + C.pthread_mutex_init(&sem.mtx, C.NULL) + attr := CondAttr{} + C.pthread_condattr_init(&attr.attr) + C.pthread_condattr_setpshared(&attr.attr, C.PTHREAD_PROCESS_PRIVATE) + C.pthread_cond_init(&sem.cond, &attr.attr) + C.pthread_condattr_destroy(&attr.attr) +} + +pub fn (mut sem Semaphore) post() { + mut c := C.atomic_load_u32(&sem.count) + for c > 1 { + if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c+1) { return } + } + C.pthread_mutex_lock(&sem.mtx) + c = C.atomic_fetch_add_u32(&sem.count, 1) + if c == 0 { + C.pthread_cond_signal(&sem.cond) + } + C.pthread_mutex_unlock(&sem.mtx) +} + +pub fn (mut sem Semaphore) wait() { + mut c := C.atomic_load_u32(&sem.count) + for c > 0 { + if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c-1) { return } + } + C.pthread_mutex_lock(&sem.mtx) + c = C.atomic_load_u32(&sem.count) + for { + if c == 0 { + C.pthread_cond_wait(&sem.cond, &sem.mtx) + c = C.atomic_load_u32(&sem.count) + } + for c > 0 { + if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c-1) { + if c > 1 { + C.pthread_cond_signal(&sem.cond) + } + goto unlock + } + } + } +unlock: + C.pthread_mutex_unlock(&sem.mtx) +} + +pub fn (mut sem Semaphore) try_wait() bool { + mut c := C.atomic_load_u32(&sem.count) + for c > 0 { + if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c-1) { return true } + } + return false +} + +pub fn (mut sem Semaphore) timed_wait(timeout time.Duration) bool { + mut c := C.atomic_load_u32(&sem.count) + for c > 0 { + if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c-1) { return true } + } + C.pthread_mutex_lock(&sem.mtx) + t_spec := timeout.timespec() + mut res := 0 + c = C.atomic_load_u32(&sem.count) + for { + if c == 0 { + res = C.pthread_cond_timedwait(&sem.cond, &sem.mtx, &t_spec) + if res == C.ETIMEDOUT { + goto unlock + } + c = C.atomic_load_u32(&sem.count) + } + for c > 0 { + if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c-1) { + if c > 1 { + C.pthread_cond_signal(&sem.cond) + } + goto unlock + } + } + } +unlock: + C.pthread_mutex_unlock(&sem.mtx) + return res == 0 +} + +pub fn (mut sem Semaphore) destroy() bool { + return C.pthread_cond_destroy(&sem.cond) == 0 && + C.pthread_mutex_destroy(&sem.mtx) == 0 +} diff --git a/vlib/sync/sync_nix.c.v b/vlib/sync/sync_nix.c.v deleted file mode 100644 index 2972c04d1a..0000000000 --- a/vlib/sync/sync_nix.c.v +++ /dev/null @@ -1,153 +0,0 @@ -// Copyright (c) 2019-2021 Alexander Medvednikov. All rights reserved. -// Use of this source code is governed by an MIT license -// that can be found in the LICENSE file. -module sync - -import time - -#flag -lpthread -$if macos { - #include -} -#include - -// [init_with=new_mutex] // TODO: implement support for this struct attribute, and disallow Mutex{} from outside the sync.new_mutex() function. -[ref_only] -pub struct Mutex { - mutex C.pthread_mutex_t -} - -[ref_only] -pub struct RwMutex { - mutex C.pthread_rwlock_t -} - -[ref_only] -struct RwMutexAttr { - attr C.pthread_rwlockattr_t -} - -/* MacOSX has no unnamed semaphores and no `timed_wait()` at all - so we emulate the behaviour with other devices */ -// [ref_only] -// struct MacOSX_Semaphore { -// sem C.dispatch_semaphore_t -// } - -[ref_only] -struct PosixSemaphore { - sem C.sem_t -} - -pub struct Semaphore { -mut: - sem voidptr -} - -pub fn new_mutex() &Mutex { - m := &Mutex{} - C.pthread_mutex_init(&m.mutex, C.NULL) - return m -} - -pub fn new_rwmutex() &RwMutex { - m := &RwMutex{} - a := &RwMutexAttr{} - C.pthread_rwlockattr_init(&a.attr) - // Give writer priority over readers - C.pthread_rwlockattr_setkind_np(&a.attr, C.PTHREAD_RWLOCK_PREFER_WRITER_NONRECURSIVE_NP) - C.pthread_rwlockattr_setpshared(&a.attr, C.PTHREAD_PROCESS_PRIVATE) - C.pthread_rwlock_init(&m.mutex, &a.attr) - return m -} - -// m_lock(), for *manual* mutex handling, since `lock` is a keyword -pub fn (mut m Mutex) m_lock() { - C.pthread_mutex_lock(&m.mutex) -} - -pub fn (mut m Mutex) unlock() { - C.pthread_mutex_unlock(&m.mutex) -} - -// RwMutex has separate read- and write locks -pub fn (mut m RwMutex) r_lock() { - C.pthread_rwlock_rdlock(&m.mutex) -} - -pub fn (mut m RwMutex) w_lock() { - C.pthread_rwlock_wrlock(&m.mutex) -} - -// Windows SRWLocks have different function to unlock -// So provide two functions here, too, to have a common interface -pub fn (mut m RwMutex) r_unlock() { - C.pthread_rwlock_unlock(&m.mutex) -} - -pub fn (mut m RwMutex) w_unlock() { - C.pthread_rwlock_unlock(&m.mutex) -} - -[inline] -pub fn new_semaphore() Semaphore { - return new_semaphore_init(0) -} - -pub fn new_semaphore_init(n u32) Semaphore { - $if macos { - s := Semaphore{ - sem: C.dispatch_semaphore_create(n) - } - return s - } $else { - s := Semaphore{ - sem: &PosixSemaphore{} - } - unsafe { C.sem_init(&&PosixSemaphore(s.sem).sem, 0, n) } - return s - } -} - -pub fn (s Semaphore) post() { - $if macos { - C.dispatch_semaphore_signal(s.sem) - } $else { - unsafe { C.sem_post(&&PosixSemaphore(s.sem).sem) } - } -} - -pub fn (s Semaphore) wait() { - $if macos { - C.dispatch_semaphore_wait(s.sem, C.DISPATCH_TIME_FOREVER) - } $else { - unsafe { C.sem_wait(&&PosixSemaphore(s.sem).sem) } - } -} - -pub fn (s Semaphore) try_wait() bool { - $if macos { - return C.dispatch_semaphore_wait(s.sem, C.DISPATCH_TIME_NOW) == 0 - } $else { - return unsafe { C.sem_trywait(&&PosixSemaphore(s.sem).sem) == 0 } - } -} - -pub fn (s Semaphore) timed_wait(timeout time.Duration) bool { - $if macos { - return C.dispatch_semaphore_wait(s.sem, C.dispatch_time(C.DISPATCH_TIME_NOW, i64(timeout))) == 0 - } $else { - t_spec := timeout.timespec() - return unsafe { C.sem_timedwait(&&PosixSemaphore(s.sem).sem, &t_spec) == 0 } - } -} - -pub fn (s Semaphore) destroy() bool { - $if macos { - for s.try_wait() {} - C.dispatch_release(s.sem) - return true - } $else { - return unsafe { C.sem_destroy(&&PosixSemaphore(s.sem).sem) == 0 } - } -} diff --git a/vlib/sync/sync_windows.c.v b/vlib/sync/sync_windows.c.v index 10da2ffe26..8f9cc31c28 100644 --- a/vlib/sync/sync_windows.c.v +++ b/vlib/sync/sync_windows.c.v @@ -5,6 +5,10 @@ module sync import time +fn C.InitializeConditionVariable(voidptr) +fn C.WakeConditionVariable(voidptr) +fn C.SleepConditionVariableSRW(voidptr, voidptr, u32, u32) int + // TODO: The suggestion of using CriticalSection instead of mutex // was discussed. Needs consideration. @@ -15,90 +19,50 @@ type SHANDLE = voidptr //[init_with=new_mutex] // TODO: implement support for this struct attribute, and disallow Mutex{} from outside the sync.new_mutex() function. -[ref_only] +// `SRWLOCK` is much more performant that `Mutex` on Windows, so use that in both cases since we don't want to share with other processes pub struct Mutex { mut: - mx MHANDLE // mutex handle - state MutexState // mutex state - cycle_wait i64 // waiting cycles (implemented only with atomic) - cycle_woken i64 // woken cycles ^ - reader_sem u32 // reader semarphone - writer_sem u32 // writer semaphones + mx C.SRWLOCK // mutex handle } -[ref_only] pub struct RwMutex { mut: mx C.SRWLOCK // mutex handle } -pub struct Semaphore { +struct Semaphore { + mtx C.SRWLOCK + cond C.CONDITION_VARIABLE mut: - sem SHANDLE -} - -enum MutexState { - broken - waiting - released - abandoned - destroyed + count u32 } pub fn new_mutex() &Mutex { - sm := &Mutex{} - unsafe { - mut m := sm - m.mx = MHANDLE(C.CreateMutex(0, false, 0)) - if isnil(m.mx) { - m.state = .broken // handle broken and mutex state are broken - return sm - } - } - return sm -} - -pub fn new_rwmutex() &RwMutex { - m := &RwMutex{} - C.InitializeSRWLock(&m.mx) + mut m := &Mutex{} + m.init() return m } +pub fn new_rwmutex() &RwMutex { + mut m := &RwMutex{} + m.init() + return m +} + +pub fn (mut m Mutex) init() { + C.InitializeSRWLock(&m.mx) +} + +pub fn (mut m RwMutex) init() { + C.InitializeSRWLock(&m.mx) +} + pub fn (mut m Mutex) m_lock() { - // if mutex handle not initalized - if isnil(m.mx) { - m.mx = MHANDLE(C.CreateMutex(0, false, 0)) - if isnil(m.mx) { - m.state = .broken // handle broken and mutex state are broken - return - } - } - state := C.WaitForSingleObject(m.mx, C.INFINITE) // infinite wait - /* TODO fix match/enum combo - m.state = match state { - C.WAIT_ABANDONED { .abandoned } - C.WAIT_OBJECT_0 { .waiting } - else { .broken } - } - */ - if state == C.WAIT_ABANDONED { - m.state = .abandoned - // FIXME Use C constant instead - } else if state == 0 /* C.WAIT_OBJECT_0 */ { - m.state = .waiting - } else { - m.state = .broken - } + C.AcquireSRWLockExclusive(&m.mx) } pub fn (mut m Mutex) unlock() { - if m.state == .waiting { - if C.ReleaseMutex(m.mx) { - m.state = .broken - return - } - } - m.state = .released + C.ReleaseSRWLockExclusive(&m.mx) } // RwMutex has separate read- and write locks @@ -121,40 +85,103 @@ pub fn (mut m RwMutex) w_unlock() { } pub fn (mut m Mutex) destroy() { - if m.state == .waiting { - m.unlock() // unlock mutex before destroying - } - C.CloseHandle(m.mx) // destroy mutex - m.state = .destroyed // setting up reference to invalid state + // nothing to do } [inline] -pub fn new_semaphore() Semaphore { +pub fn new_semaphore() &Semaphore { return new_semaphore_init(0) } -pub fn new_semaphore_init(n u32) Semaphore { - return Semaphore{ - sem: SHANDLE(C.CreateSemaphore(0, n, C.INT32_MAX, 0)) +pub fn new_semaphore_init(n u32) &Semaphore { + mut sem := &Semaphore{} + sem.init(n) + return sem +} + +pub fn (mut sem Semaphore) init(n u32) { + C.atomic_store_u32(&sem.count, n) + C.InitializeSRWLock(&sem.mtx) + C.InitializeConditionVariable(&sem.cond) +} + +pub fn (mut sem Semaphore) post() { + mut c := C.atomic_load_u32(&sem.count) + for c > 1 { + if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c+1) { return } } + C.AcquireSRWLockExclusive(&sem.mtx) + c = C.atomic_fetch_add_u32(&sem.count, 1) + if c == 0 { + C.WakeConditionVariable(&sem.cond) + } + C.ReleaseSRWLockExclusive(&sem.mtx) } -pub fn (s Semaphore) post() { - C.ReleaseSemaphore(s.sem, 1, 0) +pub fn (mut sem Semaphore) wait() { + mut c := C.atomic_load_u32(&sem.count) + for c > 0 { + if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c-1) { return } + } + C.AcquireSRWLockExclusive(&sem.mtx) + c = C.atomic_load_u32(&sem.count) + for { + if c == 0 { + C.SleepConditionVariableSRW(&sem.cond, &sem.mtx, C.INFINITE, 0) + c = C.atomic_load_u32(&sem.count) + } + for c > 0 { + if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c-1) { + if c > 1 { + C.WakeConditionVariable(&sem.cond) + } + goto unlock + } + } + } +unlock: + C.ReleaseSRWLockExclusive(&sem.mtx) } -pub fn (s Semaphore) wait() { - C.WaitForSingleObject(s.sem, C.INFINITE) +pub fn (mut sem Semaphore) try_wait() bool { + mut c := C.atomic_load_u32(&sem.count) + for c > 0 { + if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c-1) { return true } + } + return false } -pub fn (s Semaphore) try_wait() bool { - return C.WaitForSingleObject(s.sem, 0) == 0 -} - -pub fn (s Semaphore) timed_wait(timeout time.Duration) bool { - return C.WaitForSingleObject(s.sem, timeout / time.millisecond) == 0 +pub fn (mut sem Semaphore) timed_wait(timeout time.Duration) bool { + mut c := C.atomic_load_u32(&sem.count) + for c > 0 { + if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c-1) { return true } + } + C.AcquireSRWLockExclusive(&sem.mtx) + t_ms := u32(timeout / time.millisecond) + mut res := 0 + c = C.atomic_load_u32(&sem.count) + for { + if c == 0 { + res = C.SleepConditionVariableSRW(&sem.cond, &sem.mtx, t_ms, 0) + if res == 0 { + goto unlock + } + c = C.atomic_load_u32(&sem.count) + } + for c > 0 { + if C.atomic_compare_exchange_weak_u32(&sem.count, &c, c-1) { + if c > 1 { + C.WakeConditionVariable(&sem.cond) + } + goto unlock + } + } + } +unlock: + C.ReleaseSRWLockExclusive(&sem.mtx) + return res != 0 } pub fn (s Semaphore) destroy() bool { - return C.CloseHandle(s.sem) != 0 + return true } diff --git a/vlib/sync/waitgroup.v b/vlib/sync/waitgroup.v index 29bca0b691..58d234d9d8 100644 --- a/vlib/sync/waitgroup.v +++ b/vlib/sync/waitgroup.v @@ -14,7 +14,7 @@ module sync struct WaitGroup { mut: task_count int // current task count - task_count_mutex &Mutex = &Mutex(0) // This mutex protects the task_count count in add() + task_count_mutex &Mutex // This mutex protects the task_count count in add() wait_blocker &Waiter = &Waiter(0) // This blocks the wait() until released by add() } diff --git a/vlib/v/checker/tests/chan_ref.out b/vlib/v/checker/tests/chan_ref.out index d120ce91a8..62637642e8 100644 --- a/vlib/v/checker/tests/chan_ref.out +++ b/vlib/v/checker/tests/chan_ref.out @@ -1,5 +1,5 @@ vlib/v/checker/tests/chan_ref.vv:10:8: error: cannot push non-reference `St` on `chan &St` - 8 | fn f(ch chan &St, sem sync.Semaphore) { + 8 | fn f(ch chan &St, mut sem sync.Semaphore) { 9 | w := St{} 10 | ch <- w | ^ diff --git a/vlib/v/checker/tests/chan_ref.vv b/vlib/v/checker/tests/chan_ref.vv index f187781747..ad3dcec1a4 100644 --- a/vlib/v/checker/tests/chan_ref.vv +++ b/vlib/v/checker/tests/chan_ref.vv @@ -5,7 +5,7 @@ mut: n int } -fn f(ch chan &St, sem sync.Semaphore) { +fn f(ch chan &St, mut sem sync.Semaphore) { w := St{} ch <- w mut x := St{} @@ -21,8 +21,8 @@ fn f(ch chan &St, sem sync.Semaphore) { fn main() { c := chan &St{} - sem := sync.new_semaphore() - go f(c, sem) + mut sem := sync.new_semaphore() + go f(c, mut sem) y := <-c // this should fail mut z := <-c diff --git a/vlib/v/gen/array.v b/vlib/v/gen/array.v index ee379ee9d0..91ebe19c11 100644 --- a/vlib/v/gen/array.v +++ b/vlib/v/gen/array.v @@ -17,7 +17,7 @@ fn (mut g Gen) array_init(it ast.ArrayInit) { if g.is_shared { mut shared_typ := it.typ.set_flag(.shared_f) shared_styp = g.typ(shared_typ) - g.writeln('($shared_styp*)memdup(&($shared_styp){.val = ') + g.writeln('($shared_styp*)__dup_shared_array(&($shared_styp){.val = ') } else { g.write('($styp*)memdup(ADDR($styp, ') } @@ -95,7 +95,7 @@ fn (mut g Gen) array_init(it ast.ArrayInit) { } if is_amp { if g.is_shared { - g.write(', .mtx = sync__new_rwmutex()}, sizeof($shared_styp))') + g.write('}, sizeof($shared_styp))') } else { g.write('), sizeof($styp))') } diff --git a/vlib/v/gen/cgen.v b/vlib/v/gen/cgen.v index dc1a747a39..7342f8d6a3 100644 --- a/vlib/v/gen/cgen.v +++ b/vlib/v/gen/cgen.v @@ -324,6 +324,7 @@ pub fn cgen(files []ast.File, table &table.Table, pref &pref.Preferences) string if g.shared_types.len > 0 { b.writeln('\n// V shared types:') b.write(g.shared_types.str()) + b.write(c_concurrency_helpers) } if g.channel_definitions.len > 0 { b.writeln('\n// V channel code:') @@ -572,7 +573,12 @@ fn (mut g Gen) find_or_register_shared(t table.Type, base string) string { return sh_typ } mtx_typ := 'sync__RwMutex' - g.shared_types.writeln('struct $sh_typ { $base val; $mtx_typ* mtx; };') + g.shared_types.writeln('struct $sh_typ { $base val; $mtx_typ mtx; };') + g.shared_types.writeln('static inline voidptr __dup${sh_typ}(voidptr src, int sz) {') + g.shared_types.writeln('\t$sh_typ* dest = memdup(src, sz);') + g.shared_types.writeln('\tsync__RwMutex_init(&dest->mtx);') + g.shared_types.writeln('\treturn dest;') + g.shared_types.writeln('}') g.typedefs2.writeln('typedef struct $sh_typ $sh_typ;') // println('registered shared type $sh_typ') g.shareds << t_idx @@ -2707,7 +2713,7 @@ fn (mut g Gen) expr(node ast.Expr) { if g.is_shared { mut shared_typ := node.typ.set_flag(.shared_f) shared_styp = g.typ(shared_typ) - g.writeln('($shared_styp*)memdup(&($shared_styp){.val = ') + g.writeln('($shared_styp*)__dup_shared_map(&($shared_styp){.val = ') } else { styp = g.typ(node.typ) g.write('($styp*)memdup(ADDR($styp, ') @@ -2742,7 +2748,7 @@ fn (mut g Gen) expr(node ast.Expr) { g.write('new_map_2(sizeof($key_typ_str), sizeof($value_typ_str), $hash_fn, $key_eq_fn, $clone_fn, $free_fn)') } if g.is_shared { - g.write(', .mtx = sync__new_rwmutex()}') + g.write('}') if is_amp { g.write(', sizeof($shared_styp))') } @@ -2763,7 +2769,7 @@ fn (mut g Gen) expr(node ast.Expr) { } ast.PostfixExpr { if node.auto_locked != '' { - g.writeln('sync__RwMutex_w_lock($node.auto_locked->mtx);') + g.writeln('sync__RwMutex_w_lock(&$node.auto_locked->mtx);') } g.inside_map_postfix = true g.expr(node.expr) @@ -2771,7 +2777,7 @@ fn (mut g Gen) expr(node ast.Expr) { g.write(node.op.str()) if node.auto_locked != '' { g.writeln(';') - g.write('sync__RwMutex_w_unlock($node.auto_locked->mtx)') + g.write('sync__RwMutex_w_unlock(&$node.auto_locked->mtx)') } } ast.PrefixExpr { @@ -3022,7 +3028,7 @@ fn (mut g Gen) infix_expr(node ast.InfixExpr) { // string + string, string == string etc // g.infix_op = node.op if node.auto_locked != '' { - g.writeln('sync__RwMutex_w_lock($node.auto_locked->mtx);') + g.writeln('sync__RwMutex_w_lock(&$node.auto_locked->mtx);') } left_type := g.unwrap_generic(node.left_type) left_sym := g.table.get_type_symbol(left_type) @@ -3412,7 +3418,7 @@ fn (mut g Gen) infix_expr(node ast.InfixExpr) { } if node.auto_locked != '' { g.writeln(';') - g.write('sync__RwMutex_w_unlock($node.auto_locked->mtx)') + g.write('sync__RwMutex_w_unlock(&$node.auto_locked->mtx)') } } @@ -3423,7 +3429,7 @@ fn (mut g Gen) lock_expr(node ast.LockExpr) { deref := if id.is_mut { '->' } else { '.' } lock_prefix := if node.is_rlock { `r` } else { `w` } lock_prefixes << lock_prefix // keep for unlock - g.writeln('sync__RwMutex_${lock_prefix:c}_lock($name${deref}mtx);') + g.writeln('sync__RwMutex_${lock_prefix:c}_lock(&$name${deref}mtx);') } g.stmts(node.stmts) // unlock in reverse order @@ -3432,7 +3438,7 @@ fn (mut g Gen) lock_expr(node ast.LockExpr) { lock_prefix := lock_prefixes[i] name := id.name deref := if id.is_mut { '->' } else { '.' } - g.writeln('sync__RwMutex_${lock_prefix:c}_unlock($name${deref}mtx);') + g.writeln('sync__RwMutex_${lock_prefix:c}_unlock(&$name${deref}mtx);') } } @@ -4715,7 +4721,7 @@ fn (mut g Gen) struct_init(struct_init ast.StructInit) { if g.is_shared { mut shared_typ := struct_init.typ.set_flag(.shared_f) shared_styp = g.typ(shared_typ) - g.writeln('($shared_styp*)memdup(&($shared_styp){.val = ($styp){') + g.writeln('($shared_styp*)__dup${shared_styp}(&($shared_styp){.val = ($styp){') } else { g.write('($styp*)memdup(&($styp){') } @@ -4728,6 +4734,7 @@ fn (mut g Gen) struct_init(struct_init ast.StructInit) { } } else { if g.is_shared { + // TODO: non-ref shared should be forbidden g.writeln('{.val = {') } else if is_multiline { g.writeln('($styp){') @@ -4897,7 +4904,7 @@ fn (mut g Gen) struct_init(struct_init ast.StructInit) { } g.write('}') if g.is_shared { - g.write(', .mtx = sync__new_rwmutex()}') + g.write('}') if is_amp { g.write(', sizeof($shared_styp))') } diff --git a/vlib/v/gen/cheaders.v b/vlib/v/gen/cheaders.v index f7c39d88ec..23b57a27a0 100644 --- a/vlib/v/gen/cheaders.v +++ b/vlib/v/gen/cheaders.v @@ -15,6 +15,22 @@ const ( #ifndef V_CURRENT_COMMIT_HASH #define V_CURRENT_COMMIT_HASH "@@@" #endif +' + c_concurrency_helpers = ' +typedef struct __shared_map __shared_map; +struct __shared_map { map val; sync__RwMutex mtx; }; +static inline voidptr __dup_shared_map(voidptr src, int sz) { + __shared_map* dest = memdup(src, sz); + sync__RwMutex_init(&dest->mtx); + return dest; +} +typedef struct __shared_array __shared_array; +struct __shared_array { array val; sync__RwMutex mtx; }; +static inline voidptr __dup_shared_array(voidptr src, int sz) { + __shared_array* dest = memdup(src, sz); + sync__RwMutex_init(&dest->mtx); + return dest; +} ' c_common_macros = ' #define EMPTY_VARG_INITIALIZATION 0 diff --git a/vlib/v/tests/autolock_array2_test.v b/vlib/v/tests/autolock_array2_test.v index 75a552b6ab..fa650bb62b 100644 --- a/vlib/v/tests/autolock_array2_test.v +++ b/vlib/v/tests/autolock_array2_test.v @@ -4,7 +4,7 @@ const ( iterations_per_thread2 = 100000 ) -fn inc_elements(shared foo []int, n int, sem sync.Semaphore) { +fn inc_elements(shared foo []int, n int, mut sem sync.Semaphore) { for _ in 0 .. iterations_per_thread2 { foo[n]++ } @@ -13,9 +13,9 @@ fn inc_elements(shared foo []int, n int, sem sync.Semaphore) { fn test_autolocked_array_2() { shared abc := &[0, 0, 0] - sem := sync.new_semaphore() - go inc_elements(shared abc, 1, sem) - go inc_elements(shared abc, 2, sem) + mut sem := sync.new_semaphore() + go inc_elements(shared abc, 1, mut sem) + go inc_elements(shared abc, 2, mut sem) for _ in 0 .. iterations_per_thread2 { unsafe { abc[2]++ diff --git a/vlib/v/tests/semaphore_test.v b/vlib/v/tests/semaphore_test.v index 5079a996a8..b354746484 100644 --- a/vlib/v/tests/semaphore_test.v +++ b/vlib/v/tests/semaphore_test.v @@ -4,7 +4,7 @@ const ( signals_per_thread = 100000 ) -fn send_signals(sem sync.Semaphore, sem_end sync.Semaphore) { +fn send_signals(mut sem sync.Semaphore, mut sem_end sync.Semaphore) { for _ in 0 .. signals_per_thread { sem.post() } @@ -12,10 +12,10 @@ fn send_signals(sem sync.Semaphore, sem_end sync.Semaphore) { } fn test_semaphores() { - sem := sync.new_semaphore() - sem_end := sync.new_semaphore() - go send_signals(sem, sem_end) - go send_signals(sem, sem_end) + mut sem := sync.new_semaphore() + mut sem_end := sync.new_semaphore() + go send_signals(mut sem, mut sem_end) + go send_signals(mut sem, mut sem_end) for _ in 0 .. 2 * signals_per_thread { sem.wait() } diff --git a/vlib/v/tests/semaphore_timed_test.v b/vlib/v/tests/semaphore_timed_test.v index 62fadf9a48..6b5082f20b 100644 --- a/vlib/v/tests/semaphore_timed_test.v +++ b/vlib/v/tests/semaphore_timed_test.v @@ -1,7 +1,7 @@ import sync import time -fn run_forever(shared foo []int, sem sync.Semaphore) { +fn run_forever(shared foo []int, mut sem sync.Semaphore) { for { foo[0]++ } @@ -10,8 +10,8 @@ fn run_forever(shared foo []int, sem sync.Semaphore) { fn test_semaphore() { shared abc := &[0] - sem := sync.new_semaphore() - go run_forever(shared abc, sem) + mut sem := sync.new_semaphore() + go run_forever(shared abc, mut sem) for _ in 0 .. 1000 { unsafe { abc[0]-- } } diff --git a/vlib/v/tests/shared_map_test.v b/vlib/v/tests/shared_map_test.v index 726cbcb051..191b36f33a 100644 --- a/vlib/v/tests/shared_map_test.v +++ b/vlib/v/tests/shared_map_test.v @@ -1,6 +1,6 @@ import sync -fn incr(shared foo map[string]int, key string, sem sync.Semaphore) { +fn incr(shared foo map[string]int, key string, mut sem sync.Semaphore) { for _ in 0 .. 100000 { lock foo { foo[key] = foo[key] + 1 @@ -14,11 +14,11 @@ fn test_shared_array() { lock foo { foo['q'] = 20 } - sem := sync.new_semaphore() - go incr(shared foo, 'p', sem) - go incr(shared foo, 'q', sem) - go incr(shared foo, 'p', sem) - go incr(shared foo, 'q', sem) + mut sem := sync.new_semaphore() + go incr(shared foo, 'p', mut sem) + go incr(shared foo, 'q', mut sem) + go incr(shared foo, 'p', mut sem) + go incr(shared foo, 'q', mut sem) for _ in 0 .. 50000 { lock foo { foo['p'] -= 2