sync: fix mutex on win & waitgroup (all os) update. fixes news_fetcher example on win (#1776)
parent
4a506b0566
commit
32683ad6fd
|
@ -25,15 +25,14 @@ mut:
|
||||||
|
|
||||||
fn (f mut Fetcher) fetch() {
|
fn (f mut Fetcher) fetch() {
|
||||||
for {
|
for {
|
||||||
f.mu.lock()
|
|
||||||
if f.cursor >= f.ids.len {
|
if f.cursor >= f.ids.len {
|
||||||
f.mu.unlock()
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
id := f.ids[f.cursor]
|
id := f.ids[f.cursor]
|
||||||
|
f.mu.lock()
|
||||||
f.cursor++
|
f.cursor++
|
||||||
cursor := f.cursor
|
|
||||||
f.mu.unlock()
|
f.mu.unlock()
|
||||||
|
cursor := f.cursor
|
||||||
resp := http.get('https://hacker-news.firebaseio.com/v0/item/${id}.json') or {
|
resp := http.get('https://hacker-news.firebaseio.com/v0/item/${id}.json') or {
|
||||||
println('failed to fetch data from /v0/item/${id}.json')
|
println('failed to fetch data from /v0/item/${id}.json')
|
||||||
exit(1)
|
exit(1)
|
||||||
|
@ -42,8 +41,8 @@ fn (f mut Fetcher) fetch() {
|
||||||
println('failed to decode a story')
|
println('failed to decode a story')
|
||||||
exit(1)
|
exit(1)
|
||||||
}
|
}
|
||||||
f.wg.done()
|
|
||||||
println('#$cursor) $story.title | $story.url')
|
println('#$cursor) $story.title | $story.url')
|
||||||
|
f.wg.done()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -5,33 +5,29 @@
|
||||||
module sync
|
module sync
|
||||||
import os
|
import os
|
||||||
|
|
||||||
// Unsafe pointer
|
|
||||||
type Pointer voidptr
|
|
||||||
|
|
||||||
// Mutex HANDLE
|
// Mutex HANDLE
|
||||||
type MHANDLE voidptr
|
type MHANDLE voidptr
|
||||||
|
|
||||||
struct Mutex {
|
struct Mutex {
|
||||||
mut:
|
mut:
|
||||||
mx MHANDLE // mutex handle
|
mx MHANDLE // mutex handle
|
||||||
wstate u32 // wait state
|
state MutexState // mutex state
|
||||||
cycle_wait i64 // waiting cycles (implemented only with atomic)
|
cycle_wait i64 // waiting cycles (implemented only with atomic)
|
||||||
cycle_woken i64 // woken cycles ^
|
cycle_woken i64 // woken cycles ^
|
||||||
reader_sem u32 // reader semarphone
|
reader_sem u32 // reader semarphone
|
||||||
writer_sem u32 // writer semaphones
|
writer_sem u32 // writer semaphones
|
||||||
}
|
}
|
||||||
|
|
||||||
|
enum MutexState {
|
||||||
|
broken
|
||||||
|
waiting
|
||||||
|
released
|
||||||
|
abandoned
|
||||||
|
destroyed
|
||||||
|
}
|
||||||
|
|
||||||
const (
|
const (
|
||||||
WAIT = u32(8) // Waiting mutex
|
INFINITE = 0xffffffff
|
||||||
WOKEN = u32(16) // Woken mutex
|
|
||||||
ABOND = u32(32)
|
|
||||||
BROKEN = u32(64)
|
|
||||||
DESTROYED = u32(0)
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
INFINITY = 0xffffffff
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Ref - https://docs.microsoft.com/en-us/windows/win32/api/synchapi/nf-synchapi-waitforsingleobject#return-value
|
// Ref - https://docs.microsoft.com/en-us/windows/win32/api/synchapi/nf-synchapi-waitforsingleobject#return-value
|
||||||
|
@ -45,51 +41,35 @@ const (
|
||||||
|
|
||||||
pub fn (m mut Mutex) lock() {
|
pub fn (m mut Mutex) lock() {
|
||||||
// if mutex handle not initalized
|
// if mutex handle not initalized
|
||||||
if m.mx == MHANDLE(0) {
|
if isnil(m.mx) {
|
||||||
m.mx = C.CreateMutex(0, false, 0)
|
m.mx = C.CreateMutex(0, false, 0)
|
||||||
_pmhx := int(m.mx)
|
if isnil(m.mx) {
|
||||||
if (((_pmhx & 0xff) - 1) == 0) || (_pmhx == os.INVALID_HANDLE_VALUE) {
|
m.state = .broken // handle broken and mutex state are broken
|
||||||
m.wstate = BROKEN // handle broken and mutex state are broken
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
state := C.WaitForSingleObject(m.mx, INFINITY) // infinity wait
|
state := C.WaitForSingleObject(m.mx, INFINITE) // infinite wait
|
||||||
// for {
|
m.state = match state {
|
||||||
// if (m.cycle_woken - 1) < 0 {
|
WAIT_ABANDONED => { MutexState.abandoned }
|
||||||
// break
|
WAIT_OBJECT_0 => { MutexState.waiting }
|
||||||
// }
|
else => { MutexState.broken }
|
||||||
// if state&0x00000080 {
|
|
||||||
// continue // abondoned
|
|
||||||
// }
|
|
||||||
// m.cycle_wait++
|
|
||||||
// }
|
|
||||||
match state {
|
|
||||||
WAIT_FAILED => { m.wstate = BROKEN }
|
|
||||||
WAIT_ABANDONED => { m.wstate = ABOND }
|
|
||||||
WAIT_OBJECT_0 => { m.wstate = WAIT & u32(0xff) }
|
|
||||||
}
|
}
|
||||||
// todo implement atomic counter
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn (m mut Mutex) unlock() {
|
pub fn (m mut Mutex) unlock() {
|
||||||
_pmx := &m.mx
|
if m.state == .waiting {
|
||||||
if _pmx != os.INVALID_HANDLE_VALUE {
|
if C.ReleaseMutex(m.mx) != 0 {
|
||||||
if m.wstate == (WAIT & u32(0xff)) {
|
m.state = .broken
|
||||||
if C.ReleaseMutex(_pmx) != 0 {
|
|
||||||
m.wstate = WOKEN // woken up mutex
|
|
||||||
return
|
|
||||||
}
|
|
||||||
m.wstate = ABOND
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
m.wstate = BROKEN
|
m.state = .released
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn (m mut Mutex) destroy() {
|
pub fn (m mut Mutex) destroy() {
|
||||||
if m.wstate == WAIT {
|
if m.state == .waiting {
|
||||||
m.unlock() // unlock mutex before destroying
|
m.unlock() // unlock mutex before destroying
|
||||||
}
|
}
|
||||||
m.wstate = DESTROYED // setting up reference to invalid state
|
|
||||||
C.CloseHandle(m.mx) // destroy mutex
|
C.CloseHandle(m.mx) // destroy mutex
|
||||||
|
m.state = .destroyed // setting up reference to invalid state
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,23 +7,16 @@ module sync
|
||||||
struct WaitGroup {
|
struct WaitGroup {
|
||||||
mut:
|
mut:
|
||||||
mu Mutex
|
mu Mutex
|
||||||
finished Mutex
|
|
||||||
active int
|
active int
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn (wg mut WaitGroup) add(delta int) {
|
pub fn (wg mut WaitGroup) add(delta int) {
|
||||||
wg.mu.lock()
|
wg.mu.lock()
|
||||||
if wg.active == 0 {
|
|
||||||
wg.finished.lock()
|
|
||||||
}
|
|
||||||
wg.active += delta
|
wg.active += delta
|
||||||
|
wg.mu.unlock()
|
||||||
if wg.active < 0 {
|
if wg.active < 0 {
|
||||||
panic('Negative number of jobs in waitgroup')
|
panic('Negative number of jobs in waitgroup')
|
||||||
}
|
}
|
||||||
if wg.active == 0 {
|
|
||||||
wg.finished.unlock()
|
|
||||||
}
|
|
||||||
wg.mu.unlock()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn (wg mut WaitGroup) done() {
|
pub fn (wg mut WaitGroup) done() {
|
||||||
|
@ -31,7 +24,8 @@ pub fn (wg mut WaitGroup) done() {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn (wg mut WaitGroup) wait() {
|
pub fn (wg mut WaitGroup) wait() {
|
||||||
wg.finished.lock()
|
for wg.active > 0 {
|
||||||
wg.finished.unlock()
|
// waiting
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue