sync/channels: implement `close()` method (#6098)

pull/6100/head
Uwe Krüger 2020-08-10 06:22:20 +02:00 committed by GitHub
parent fce106cf83
commit 20a65cf9c8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 267 additions and 20 deletions

View File

@ -236,15 +236,29 @@ ch.pop(&m)
ch2.pop(&y)
```
A channel can be closed to indicate that no further objects can be pushed. Any attempt
to do so will then result in a runtime panic. The `pop()` method will return immediately `false`
if the associated channel has been closed and the buffer is empty.
```v
ch.close()
...
if ch.pop(&m) {
println('got $m')
} else {
println('channel has been closed')
}
```
The select call is somewhat tricky. The `channel_select()` function needs three arrays that
contain the channels, the directions (pop/push) and the object references and
a timeout of type `time.Duration` (or `0` to wait unlimited) as parameters. It returns the
index of the object that was pushed or popped or `-1` for timeout.
```v
mut chans := [ch, ch2] // the channels to monitor
directions := [false, false] // `true` means push, `false` means pop
mut objs := [voidptr(&m), &y] // the objects to push or pop
mut chans := [ch, ch2] // the channels to monitor
directions := [sync.Direction.pop, .pop] // .push or .pop
mut objs := [voidptr(&m), &y] // the objects to push or pop
// idx contains the index of the object that was pushed or popped, -1 means timeout occured
idx := sync.channel_select(mut chans, directions, mut objs, 0) // wait unlimited
@ -261,3 +275,4 @@ match idx {
}
}
```
If all channels have been closed `-2` is returned as index.

View File

@ -0,0 +1,80 @@
import sync
import time
fn do_rec(mut ch sync.Channel, mut resch sync.Channel) {
mut sum := i64(0)
for {
mut a := 0
if !ch.pop(&a) {
break
}
sum += a
}
println(sum)
resch.push(&sum)
}
fn do_send(mut ch sync.Channel) {
for i in 0 .. 8000 {
ch.push(&i)
}
ch.close()
}
fn test_channel_close_buffered_multi() {
mut ch := sync.new_channel<int>(0)
mut resch := sync.new_channel<i64>(100)
go do_rec(mut ch, mut resch)
go do_rec(mut ch, mut resch)
go do_rec(mut ch, mut resch)
go do_rec(mut ch, mut resch)
go do_send(mut ch)
mut sum := i64(0)
for _ in 0 .. 4 {
mut r := i64(0)
resch.pop(&r)
sum += r
}
assert sum == i64(8000) * (8000 - 1) / 2
}
fn test_channel_close_unbuffered_multi() {
mut ch := sync.new_channel<int>(0)
mut resch := sync.new_channel<i64>(100)
go do_rec(mut ch, mut resch)
go do_rec(mut ch, mut resch)
go do_rec(mut ch, mut resch)
go do_rec(mut ch, mut resch)
go do_send(mut ch)
mut sum := i64(0)
for _ in 0 .. 4 {
mut r := i64(0)
resch.pop(&r)
sum += r
}
assert sum == i64(8000) * (8000 - 1) / 2
}
fn test_channel_close_buffered() {
mut ch := sync.new_channel<int>(0)
mut resch := sync.new_channel<i64>(100)
go do_rec(mut ch, mut resch)
go do_send(mut ch)
mut sum := i64(0)
mut r := i64(0)
resch.pop(&r)
sum += r
assert sum == i64(8000) * (8000 - 1) / 2
}
fn test_channel_close_unbuffered() {
mut ch := sync.new_channel<int>(0)
mut resch := sync.new_channel<i64>(100)
go do_rec(mut ch, mut resch)
go do_send(mut ch)
mut sum := i64(0)
mut r := i64(0)
resch.pop(&r)
sum += r
assert sum == i64(8000) * (8000 - 1) / 2
}

View File

@ -76,6 +76,12 @@ enum Direction {
push
}
enum TransactionState {
success
not_ready // push()/pop() would have to wait, but no_block was requested
closed
}
struct Channel {
writesem Semaphore // to wake thread that wanted to write, but buffer was full
readsem Semaphore // to wake thread that wanted to read, but buffer was empty
@ -99,6 +105,7 @@ mut: // atomic
read_subscriber &Subscription
write_sub_mtx u16
read_sub_mtx u16
closed u16
}
pub fn new_channel<T>(n u32) &Channel {
@ -119,11 +126,42 @@ pub fn new_channel<T>(n u32) &Channel {
}
}
pub fn (mut ch Channel) push(src voidptr) {
ch.try_push(src, false)
pub fn (mut ch Channel) close() {
C.atomic_store_u16(&ch.closed, 1)
mut nulladr := voidptr(0)
for !C.atomic_compare_exchange_weak_ptr(&ch.adr_written, &nulladr, voidptr(-1)) {
nulladr = voidptr(0)
}
ch.readsem_im.post()
ch.readsem.post()
mut null16 := u16(0)
for !C.atomic_compare_exchange_weak_u16(&ch.read_sub_mtx, &null16, u16(1)) {
null16 = u16(0)
}
if ch.read_subscriber != voidptr(0) {
ch.read_subscriber.sem.post()
}
C.atomic_store_u16(&ch.read_sub_mtx, u16(0))
null16 = u16(0)
for !C.atomic_compare_exchange_weak_u16(&ch.write_sub_mtx, &null16, u16(1)) {
null16 = u16(0)
}
if ch.write_subscriber != voidptr(0) {
ch.write_subscriber.sem.post()
}
C.atomic_store_u16(&ch.write_sub_mtx, u16(0))
}
fn (mut ch Channel) try_push(src voidptr, no_block bool) bool {
pub fn (mut ch Channel) push(src voidptr) {
if ch.try_push(src, false) == .closed {
panic('push on closed channel')
}
}
fn (mut ch Channel) try_push(src voidptr, no_block bool) TransactionState {
if C.atomic_load_u16(&ch.closed) != 0 {
return .closed
}
spinloops_, spinloops_sem_ := if no_block { 1, 1 } else { spinloops, spinloops_sem }
mut have_swapped := false
for {
@ -138,11 +176,11 @@ fn (mut ch Channel) try_push(src voidptr, no_block bool) bool {
nulladr = voidptr(0)
}
ch.readsem_im.post()
return true
return .success
}
}
if no_block && ch.queue_length == 0 {
return false
return .not_ready
}
// get token to read
for _ in 0 .. spinloops_sem_ {
@ -206,10 +244,14 @@ fn (mut ch Channel) try_push(src voidptr, no_block bool) bool {
} else {
// this semaphore was not for us - repost in
ch.writesem_im.post()
if src2 == voidptr(-1) {
ch.readsem.post()
return .closed
}
src2 = src
}
}
return true
return .success
} else {
// buffered channel
mut space_in_queue := false
@ -257,7 +299,7 @@ fn (mut ch Channel) try_push(src voidptr, no_block bool) bool {
}
C.atomic_store_u16(&ch.read_sub_mtx, u16(0))
}
return true
return .success
} else {
ch.writesem.post()
}
@ -265,11 +307,11 @@ fn (mut ch Channel) try_push(src voidptr, no_block bool) bool {
}
}
pub fn (mut ch Channel) pop(dest voidptr) {
ch.try_pop(dest, false)
pub fn (mut ch Channel) pop(dest voidptr) bool {
return ch.try_pop(dest, false) == .success
}
fn (mut ch Channel) try_pop(dest voidptr, no_block bool) bool {
fn (mut ch Channel) try_pop(dest voidptr, no_block bool) TransactionState {
spinloops_, spinloops_sem_ := if no_block { 1, 1 } else { spinloops, spinloops_sem }
mut have_swapped := false
mut write_in_progress := false
@ -287,11 +329,11 @@ fn (mut ch Channel) try_pop(dest voidptr, no_block bool) bool {
nulladr = voidptr(0)
}
ch.writesem_im.post()
return true
return .success
}
}
if no_block {
return false
return if C.atomic_load_u16(&ch.closed) == 0 { TransactionState.not_ready } else { TransactionState.closed }
}
}
// get token to read
@ -303,7 +345,7 @@ fn (mut ch Channel) try_pop(dest voidptr, no_block bool) bool {
}
if !got_sem {
if no_block {
return false
return if C.atomic_load_u16(&ch.closed) == 0 { TransactionState.not_ready } else { TransactionState.closed }
}
ch.readsem.wait()
}
@ -354,7 +396,7 @@ fn (mut ch Channel) try_pop(dest voidptr, no_block bool) bool {
}
C.atomic_store_u16(&ch.write_sub_mtx, u16(0))
}
return true
return .success
}
}
// try to advertise `dest` as writable
@ -386,6 +428,9 @@ fn (mut ch Channel) try_pop(dest voidptr, no_block bool) bool {
if C.atomic_compare_exchange_strong_ptr(&ch.adr_written, &dest2, voidptr(0)) {
have_swapped = true
break
} else if dest2 == voidptr(-1) {
ch.readsem.post()
return .closed
}
dest2 = dest
}
@ -408,10 +453,14 @@ fn (mut ch Channel) try_pop(dest voidptr, no_block bool) bool {
} else {
// this semaphore was not for us - repost in
ch.readsem_im.post()
if dest2 == voidptr(-1) {
ch.readsem.post()
return .closed
}
dest2 = dest
}
}
return true
return .success
}
}
@ -454,23 +503,34 @@ pub fn channel_select(mut channels []&Channel, dir []Direction, mut objrefs []vo
mut event_idx := -1 // negative index means `timed out`
for {
rnd := rand.u32_in_range(0, u32(channels.len))
mut num_closed := 0
for j, _ in channels {
mut i := j + int(rnd)
if i >= channels.len {
i -= channels.len
}
if dir[i] == .push {
if channels[i].try_push(objrefs[i], true) {
stat := channels[i].try_push(objrefs[i], true)
if stat == .success {
event_idx = i
goto restore
} else if stat == .closed {
num_closed++
}
} else {
if channels[i].try_pop(objrefs[i], true) {
stat := channels[i].try_pop(objrefs[i], true)
if stat == .success {
event_idx = i
goto restore
} else if stat == .closed {
num_closed++
}
}
}
if num_closed == channels.len {
event_idx = -2
goto restore
}
if timeout > 0 {
remaining := timeout - stopwatch.elapsed()
if !sem.timed_wait(remaining) {

View File

@ -0,0 +1,92 @@
import sync
fn do_rec_i64(mut ch sync.Channel) {
mut sum := i64(0)
for i in 0 .. 300 {
if i == 200 {
ch.close()
}
mut a := i64(0)
if ch.pop(&a) {
sum += a
}
}
assert sum == 200 * (200 - 1) / 2
}
fn do_send_int(mut ch sync.Channel) {
for i in 0 .. 300 {
ch.push(&i)
}
ch.close()
}
fn do_send_byte(mut ch sync.Channel) {
for i in 0 .. 300 {
ii := byte(i)
ch.push(&ii)
}
ch.close()
}
fn do_send_i64(mut ch sync.Channel) {
for i in 0 .. 300 {
ii := i64(i)
ch.push(&ii)
}
ch.close()
}
fn test_select() {
mut chi := sync.new_channel<int>(0)
mut chl := sync.new_channel<i64>(1)
mut chb := sync.new_channel<byte>(10)
mut recch := sync.new_channel<i64>(0)
go do_rec_i64(mut recch)
go do_send_int(mut chi)
go do_send_byte(mut chb)
go do_send_i64(mut chl)
mut channels := [chi, recch, chl, chb]
directions := [sync.Direction.pop, .push, .pop, .pop]
mut sum := i64(0)
mut rl := i64(0)
mut ri := int(0)
mut rb := byte(0)
mut sl := i64(0)
mut objs := [voidptr(&ri), &sl, &rl, &rb]
for j in 0 .. 1101 {
idx := sync.channel_select(mut channels, directions, mut objs, 0)
match idx {
0 {
sum += ri
}
1 {
sl++
}
2 {
sum += rl
}
3 {
sum += rb
}
-2 {
// channel was closed - last item
assert j == 1100
}
else {
println('got $idx (timeout)')
assert false
}
}
if j == 1100 {
// check also in other direction
assert idx == -2
}
}
// Use Gauß' formula for the first 2 contributions
expected_sum := 2 * (300 * (300 - 1) / 2) +
// the 3rd contribution is `byte` and must be seen modulo 256
256 * (256 - 1) / 2 +
44 * (44 - 1) / 2
assert sum == expected_sum
}