diff --git a/doc/upcoming.md b/doc/upcoming.md index 867da4eeca..29030861a8 100644 --- a/doc/upcoming.md +++ b/doc/upcoming.md @@ -236,15 +236,29 @@ ch.pop(&m) ch2.pop(&y) ``` +A channel can be closed to indicate that no further objects can be pushed. Any attempt +to do so will then result in a runtime panic. The `pop()` method will return immediately `false` +if the associated channel has been closed and the buffer is empty. + +```v +ch.close() +... +if ch.pop(&m) { + println('got $m') +} else { + println('channel has been closed') +} +``` + The select call is somewhat tricky. The `channel_select()` function needs three arrays that contain the channels, the directions (pop/push) and the object references and a timeout of type `time.Duration` (or `0` to wait unlimited) as parameters. It returns the index of the object that was pushed or popped or `-1` for timeout. ```v -mut chans := [ch, ch2] // the channels to monitor -directions := [false, false] // `true` means push, `false` means pop -mut objs := [voidptr(&m), &y] // the objects to push or pop +mut chans := [ch, ch2] // the channels to monitor +directions := [sync.Direction.pop, .pop] // .push or .pop +mut objs := [voidptr(&m), &y] // the objects to push or pop // idx contains the index of the object that was pushed or popped, -1 means timeout occured idx := sync.channel_select(mut chans, directions, mut objs, 0) // wait unlimited @@ -261,3 +275,4 @@ match idx { } } ``` +If all channels have been closed `-2` is returned as index. diff --git a/vlib/sync/channel_close_test.v b/vlib/sync/channel_close_test.v new file mode 100644 index 0000000000..2bfc2d7d01 --- /dev/null +++ b/vlib/sync/channel_close_test.v @@ -0,0 +1,80 @@ +import sync +import time + +fn do_rec(mut ch sync.Channel, mut resch sync.Channel) { + mut sum := i64(0) + for { + mut a := 0 + if !ch.pop(&a) { + break + } + sum += a + } + println(sum) + resch.push(&sum) +} + +fn do_send(mut ch sync.Channel) { + for i in 0 .. 8000 { + ch.push(&i) + } + ch.close() +} + +fn test_channel_close_buffered_multi() { + mut ch := sync.new_channel(0) + mut resch := sync.new_channel(100) + go do_rec(mut ch, mut resch) + go do_rec(mut ch, mut resch) + go do_rec(mut ch, mut resch) + go do_rec(mut ch, mut resch) + go do_send(mut ch) + mut sum := i64(0) + for _ in 0 .. 4 { + mut r := i64(0) + resch.pop(&r) + sum += r + } + assert sum == i64(8000) * (8000 - 1) / 2 +} + +fn test_channel_close_unbuffered_multi() { + mut ch := sync.new_channel(0) + mut resch := sync.new_channel(100) + go do_rec(mut ch, mut resch) + go do_rec(mut ch, mut resch) + go do_rec(mut ch, mut resch) + go do_rec(mut ch, mut resch) + go do_send(mut ch) + mut sum := i64(0) + for _ in 0 .. 4 { + mut r := i64(0) + resch.pop(&r) + sum += r + } + assert sum == i64(8000) * (8000 - 1) / 2 +} + +fn test_channel_close_buffered() { + mut ch := sync.new_channel(0) + mut resch := sync.new_channel(100) + go do_rec(mut ch, mut resch) + go do_send(mut ch) + mut sum := i64(0) + mut r := i64(0) + resch.pop(&r) + sum += r + assert sum == i64(8000) * (8000 - 1) / 2 +} + +fn test_channel_close_unbuffered() { + mut ch := sync.new_channel(0) + mut resch := sync.new_channel(100) + go do_rec(mut ch, mut resch) + go do_send(mut ch) + mut sum := i64(0) + mut r := i64(0) + resch.pop(&r) + sum += r + assert sum == i64(8000) * (8000 - 1) / 2 +} diff --git a/vlib/sync/channels.v b/vlib/sync/channels.v index 6f0f76d21f..4dfb765065 100644 --- a/vlib/sync/channels.v +++ b/vlib/sync/channels.v @@ -76,6 +76,12 @@ enum Direction { push } +enum TransactionState { + success + not_ready // push()/pop() would have to wait, but no_block was requested + closed +} + struct Channel { writesem Semaphore // to wake thread that wanted to write, but buffer was full readsem Semaphore // to wake thread that wanted to read, but buffer was empty @@ -99,6 +105,7 @@ mut: // atomic read_subscriber &Subscription write_sub_mtx u16 read_sub_mtx u16 + closed u16 } pub fn new_channel(n u32) &Channel { @@ -119,11 +126,42 @@ pub fn new_channel(n u32) &Channel { } } -pub fn (mut ch Channel) push(src voidptr) { - ch.try_push(src, false) +pub fn (mut ch Channel) close() { + C.atomic_store_u16(&ch.closed, 1) + mut nulladr := voidptr(0) + for !C.atomic_compare_exchange_weak_ptr(&ch.adr_written, &nulladr, voidptr(-1)) { + nulladr = voidptr(0) + } + ch.readsem_im.post() + ch.readsem.post() + mut null16 := u16(0) + for !C.atomic_compare_exchange_weak_u16(&ch.read_sub_mtx, &null16, u16(1)) { + null16 = u16(0) + } + if ch.read_subscriber != voidptr(0) { + ch.read_subscriber.sem.post() + } + C.atomic_store_u16(&ch.read_sub_mtx, u16(0)) + null16 = u16(0) + for !C.atomic_compare_exchange_weak_u16(&ch.write_sub_mtx, &null16, u16(1)) { + null16 = u16(0) + } + if ch.write_subscriber != voidptr(0) { + ch.write_subscriber.sem.post() + } + C.atomic_store_u16(&ch.write_sub_mtx, u16(0)) } -fn (mut ch Channel) try_push(src voidptr, no_block bool) bool { +pub fn (mut ch Channel) push(src voidptr) { + if ch.try_push(src, false) == .closed { + panic('push on closed channel') + } +} + +fn (mut ch Channel) try_push(src voidptr, no_block bool) TransactionState { + if C.atomic_load_u16(&ch.closed) != 0 { + return .closed + } spinloops_, spinloops_sem_ := if no_block { 1, 1 } else { spinloops, spinloops_sem } mut have_swapped := false for { @@ -138,11 +176,11 @@ fn (mut ch Channel) try_push(src voidptr, no_block bool) bool { nulladr = voidptr(0) } ch.readsem_im.post() - return true + return .success } } if no_block && ch.queue_length == 0 { - return false + return .not_ready } // get token to read for _ in 0 .. spinloops_sem_ { @@ -206,10 +244,14 @@ fn (mut ch Channel) try_push(src voidptr, no_block bool) bool { } else { // this semaphore was not for us - repost in ch.writesem_im.post() + if src2 == voidptr(-1) { + ch.readsem.post() + return .closed + } src2 = src } } - return true + return .success } else { // buffered channel mut space_in_queue := false @@ -257,7 +299,7 @@ fn (mut ch Channel) try_push(src voidptr, no_block bool) bool { } C.atomic_store_u16(&ch.read_sub_mtx, u16(0)) } - return true + return .success } else { ch.writesem.post() } @@ -265,11 +307,11 @@ fn (mut ch Channel) try_push(src voidptr, no_block bool) bool { } } -pub fn (mut ch Channel) pop(dest voidptr) { - ch.try_pop(dest, false) +pub fn (mut ch Channel) pop(dest voidptr) bool { + return ch.try_pop(dest, false) == .success } -fn (mut ch Channel) try_pop(dest voidptr, no_block bool) bool { +fn (mut ch Channel) try_pop(dest voidptr, no_block bool) TransactionState { spinloops_, spinloops_sem_ := if no_block { 1, 1 } else { spinloops, spinloops_sem } mut have_swapped := false mut write_in_progress := false @@ -287,11 +329,11 @@ fn (mut ch Channel) try_pop(dest voidptr, no_block bool) bool { nulladr = voidptr(0) } ch.writesem_im.post() - return true + return .success } } if no_block { - return false + return if C.atomic_load_u16(&ch.closed) == 0 { TransactionState.not_ready } else { TransactionState.closed } } } // get token to read @@ -303,7 +345,7 @@ fn (mut ch Channel) try_pop(dest voidptr, no_block bool) bool { } if !got_sem { if no_block { - return false + return if C.atomic_load_u16(&ch.closed) == 0 { TransactionState.not_ready } else { TransactionState.closed } } ch.readsem.wait() } @@ -354,7 +396,7 @@ fn (mut ch Channel) try_pop(dest voidptr, no_block bool) bool { } C.atomic_store_u16(&ch.write_sub_mtx, u16(0)) } - return true + return .success } } // try to advertise `dest` as writable @@ -386,6 +428,9 @@ fn (mut ch Channel) try_pop(dest voidptr, no_block bool) bool { if C.atomic_compare_exchange_strong_ptr(&ch.adr_written, &dest2, voidptr(0)) { have_swapped = true break + } else if dest2 == voidptr(-1) { + ch.readsem.post() + return .closed } dest2 = dest } @@ -408,10 +453,14 @@ fn (mut ch Channel) try_pop(dest voidptr, no_block bool) bool { } else { // this semaphore was not for us - repost in ch.readsem_im.post() + if dest2 == voidptr(-1) { + ch.readsem.post() + return .closed + } dest2 = dest } } - return true + return .success } } @@ -454,23 +503,34 @@ pub fn channel_select(mut channels []&Channel, dir []Direction, mut objrefs []vo mut event_idx := -1 // negative index means `timed out` for { rnd := rand.u32_in_range(0, u32(channels.len)) + mut num_closed := 0 for j, _ in channels { mut i := j + int(rnd) if i >= channels.len { i -= channels.len } if dir[i] == .push { - if channels[i].try_push(objrefs[i], true) { + stat := channels[i].try_push(objrefs[i], true) + if stat == .success { event_idx = i goto restore + } else if stat == .closed { + num_closed++ } } else { - if channels[i].try_pop(objrefs[i], true) { + stat := channels[i].try_pop(objrefs[i], true) + if stat == .success { event_idx = i goto restore + } else if stat == .closed { + num_closed++ } } } + if num_closed == channels.len { + event_idx = -2 + goto restore + } if timeout > 0 { remaining := timeout - stopwatch.elapsed() if !sem.timed_wait(remaining) { diff --git a/vlib/sync/select_close_test.v b/vlib/sync/select_close_test.v new file mode 100644 index 0000000000..b4ec30ed21 --- /dev/null +++ b/vlib/sync/select_close_test.v @@ -0,0 +1,92 @@ +import sync + +fn do_rec_i64(mut ch sync.Channel) { + mut sum := i64(0) + for i in 0 .. 300 { + if i == 200 { + ch.close() + } + mut a := i64(0) + if ch.pop(&a) { + sum += a + } + } + assert sum == 200 * (200 - 1) / 2 +} + +fn do_send_int(mut ch sync.Channel) { + for i in 0 .. 300 { + ch.push(&i) + } + ch.close() +} + +fn do_send_byte(mut ch sync.Channel) { + for i in 0 .. 300 { + ii := byte(i) + ch.push(&ii) + } + ch.close() +} + +fn do_send_i64(mut ch sync.Channel) { + for i in 0 .. 300 { + ii := i64(i) + ch.push(&ii) + } + ch.close() +} + +fn test_select() { + mut chi := sync.new_channel(0) + mut chl := sync.new_channel(1) + mut chb := sync.new_channel(10) + mut recch := sync.new_channel(0) + go do_rec_i64(mut recch) + go do_send_int(mut chi) + go do_send_byte(mut chb) + go do_send_i64(mut chl) + mut channels := [chi, recch, chl, chb] + directions := [sync.Direction.pop, .push, .pop, .pop] + mut sum := i64(0) + mut rl := i64(0) + mut ri := int(0) + mut rb := byte(0) + mut sl := i64(0) + mut objs := [voidptr(&ri), &sl, &rl, &rb] + for j in 0 .. 1101 { + idx := sync.channel_select(mut channels, directions, mut objs, 0) + match idx { + 0 { + sum += ri + } + 1 { + sl++ + } + 2 { + sum += rl + } + 3 { + sum += rb + } + -2 { + // channel was closed - last item + assert j == 1100 + } + else { + println('got $idx (timeout)') + assert false + } + } + if j == 1100 { + // check also in other direction + assert idx == -2 + } + } + // Use Gauß' formula for the first 2 contributions + expected_sum := 2 * (300 * (300 - 1) / 2) + + // the 3rd contribution is `byte` and must be seen modulo 256 + 256 * (256 - 1) / 2 + + 44 * (44 - 1) / 2 + assert sum == expected_sum +}