From 32683ad6fd652888e6136b0d9ba8f6f58ae1e214 Mon Sep 17 00:00:00 2001 From: joe-conigliaro Date: Thu, 29 Aug 2019 18:48:03 +1000 Subject: [PATCH] sync: fix mutex on win & waitgroup (all os) update. fixes news_fetcher example on win (#1776) --- examples/news_fetcher.v | 7 ++- vlib/sync/sync_win.v | 110 ++++++++++++++++------------------------ vlib/sync/waitgroup.v | 30 +++++------ 3 files changed, 60 insertions(+), 87 deletions(-) diff --git a/examples/news_fetcher.v b/examples/news_fetcher.v index 6310323904..17f5eabcc9 100644 --- a/examples/news_fetcher.v +++ b/examples/news_fetcher.v @@ -25,15 +25,14 @@ mut: fn (f mut Fetcher) fetch() { for { - f.mu.lock() if f.cursor >= f.ids.len { - f.mu.unlock() return } id := f.ids[f.cursor] + f.mu.lock() f.cursor++ - cursor := f.cursor f.mu.unlock() + cursor := f.cursor resp := http.get('https://hacker-news.firebaseio.com/v0/item/${id}.json') or { println('failed to fetch data from /v0/item/${id}.json') exit(1) @@ -42,8 +41,8 @@ fn (f mut Fetcher) fetch() { println('failed to decode a story') exit(1) } - f.wg.done() println('#$cursor) $story.title | $story.url') + f.wg.done() } } diff --git a/vlib/sync/sync_win.v b/vlib/sync/sync_win.v index 1639599bdd..ebe7c3a6e5 100644 --- a/vlib/sync/sync_win.v +++ b/vlib/sync/sync_win.v @@ -5,91 +5,71 @@ module sync import os -// Unsafe pointer -type Pointer voidptr - // Mutex HANDLE type MHANDLE voidptr struct Mutex { mut: - mx MHANDLE // mutex handle - wstate u32 // wait state - cycle_wait i64 // waiting cycles (implemented only with atomic) - cycle_woken i64 // woken cycles ^ - reader_sem u32 // reader semarphone - writer_sem u32 // writer semaphones + mx MHANDLE // mutex handle + state MutexState // mutex state + cycle_wait i64 // waiting cycles (implemented only with atomic) + cycle_woken i64 // woken cycles ^ + reader_sem u32 // reader semarphone + writer_sem u32 // writer semaphones } +enum MutexState { + broken + waiting + released + abandoned + destroyed +} const ( - WAIT = u32(8) // Waiting mutex - WOKEN = u32(16) // Woken mutex - ABOND = u32(32) - BROKEN = u32(64) - DESTROYED = u32(0) -) - -const ( - INFINITY = 0xffffffff + INFINITE = 0xffffffff ) // Ref - https://docs.microsoft.com/en-us/windows/win32/api/synchapi/nf-synchapi-waitforsingleobject#return-value const ( - WAIT_ABANDONED = 0x00000080 - WAIT_IO_COMPLETION = 0x000000C0 - WAIT_OBJECT_0 = 0x00000000 - WAIT_TIMEOUT = 0x00000102 - WAIT_FAILED = 0xFFFFFFFF + WAIT_ABANDONED = 0x00000080 + WAIT_IO_COMPLETION = 0x000000C0 + WAIT_OBJECT_0 = 0x00000000 + WAIT_TIMEOUT = 0x00000102 + WAIT_FAILED = 0xFFFFFFFF ) pub fn (m mut Mutex) lock() { - // if mutex handle not initalized - if m.mx == MHANDLE(0) { - m.mx = C.CreateMutex(0, false, 0) - _pmhx := int(m.mx) - if (((_pmhx & 0xff) - 1) == 0) || (_pmhx == os.INVALID_HANDLE_VALUE) { - m.wstate = BROKEN // handle broken and mutex state are broken - return - } - } - state := C.WaitForSingleObject(m.mx, INFINITY) // infinity wait - // for { - // if (m.cycle_woken - 1) < 0 { - // break - // } - // if state&0x00000080 { - // continue // abondoned - // } - // m.cycle_wait++ - // } - match state { - WAIT_FAILED => { m.wstate = BROKEN } - WAIT_ABANDONED => { m.wstate = ABOND } - WAIT_OBJECT_0 => { m.wstate = WAIT & u32(0xff) } - } - // todo implement atomic counter + // if mutex handle not initalized + if isnil(m.mx) { + m.mx = C.CreateMutex(0, false, 0) + if isnil(m.mx) { + m.state = .broken // handle broken and mutex state are broken + return + } + } + state := C.WaitForSingleObject(m.mx, INFINITE) // infinite wait + m.state = match state { + WAIT_ABANDONED => { MutexState.abandoned } + WAIT_OBJECT_0 => { MutexState.waiting } + else => { MutexState.broken } + } } pub fn (m mut Mutex) unlock() { - _pmx := &m.mx - if _pmx != os.INVALID_HANDLE_VALUE { - if m.wstate == (WAIT & u32(0xff)) { - if C.ReleaseMutex(_pmx) != 0 { - m.wstate = WOKEN // woken up mutex - return - } - m.wstate = ABOND - return - } - } - m.wstate = BROKEN + if m.state == .waiting { + if C.ReleaseMutex(m.mx) != 0 { + m.state = .broken + return + } + } + m.state = .released } pub fn (m mut Mutex) destroy() { - if m.wstate == WAIT { - m.unlock() // unlock mutex before destroying - } - m.wstate = DESTROYED // setting up reference to invalid state - C.CloseHandle(m.mx) // destroy mutex + if m.state == .waiting { + m.unlock() // unlock mutex before destroying + } + C.CloseHandle(m.mx) // destroy mutex + m.state = .destroyed // setting up reference to invalid state } diff --git a/vlib/sync/waitgroup.v b/vlib/sync/waitgroup.v index 86aeba13b7..5eca30507c 100644 --- a/vlib/sync/waitgroup.v +++ b/vlib/sync/waitgroup.v @@ -6,32 +6,26 @@ module sync struct WaitGroup { mut: - mu Mutex - finished Mutex - active int + mu Mutex + active int } pub fn (wg mut WaitGroup) add(delta int) { - wg.mu.lock() - if wg.active == 0 { - wg.finished.lock() - } - wg.active += delta - if wg.active < 0 { - panic('Negative number of jobs in waitgroup') - } - if wg.active == 0 { - wg.finished.unlock() - } - wg.mu.unlock() + wg.mu.lock() + wg.active += delta + wg.mu.unlock() + if wg.active < 0 { + panic('Negative number of jobs in waitgroup') + } } pub fn (wg mut WaitGroup) done() { - wg.add(-1) + wg.add(-1) } pub fn (wg mut WaitGroup) wait() { - wg.finished.lock() - wg.finished.unlock() + for wg.active > 0 { + // waiting + } }