sync/channels: provide `.cap` and `.len()` (#6104)

pull/6112/head
Uwe Krüger 2020-08-11 17:52:13 +02:00 committed by GitHub
parent bc3b411b12
commit 433610b5c0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 49 additions and 12 deletions

View File

@ -250,6 +250,15 @@ if ch.pop(&m) {
}
```
There are also methods `try_push()` and `try_pop()` which return immediatelly with the return value `.not_ready` if the transaction
cannot be performed without waiting. The return value is of type `sync.TransactionState` which can also be
`.success` or `.closed`.
To monitor a channel there is a method `len()` which returns the number of elements currently in the queue and the attribute
`cap` for the queue length. Please be aware that in general `channel.len() > 0` does not guarantee that the next
`pop()` will succeed without waiting, since other threads may already have "stolen" elements from the queue. Use `try_pop()` to
accomplish this kind of task.
The select call is somewhat tricky. The `channel_select()` function needs three arrays that
contain the channels, the directions (pop/push) and the object references and
a timeout of type `time.Duration` (`time.infinite` or `-1` to wait unlimited) as parameters. It returns the

View File

@ -0,0 +1,22 @@
import sync
const (
queue_len = 1000
queue_fill = 763
)
fn do_send(mut ch sync.Channel, fin sync.Semaphore) {
for i in 0 .. queue_fill {
ch.push(&i)
}
fin.post()
}
fn test_channel_len_cap() {
mut ch := sync.new_channel<int>(queue_len)
sem := sync.new_semaphore()
go do_send(mut ch, sem)
sem.wait()
assert ch.cap == queue_len
assert ch.len() == queue_fill
}

View File

@ -90,7 +90,6 @@ struct Channel {
ringbuf byteptr // queue for buffered channels
statusbuf byteptr // flags to synchronize write/read in ringbuf
objsize u32
queue_length u32 // in #objects
mut: // atomic
write_adr C.atomic_uintptr_t // if != NULL the next obj can be written here without wait
read_adr C.atomic_uintptr_t // if != NULL an obj can be read from here without wait
@ -106,6 +105,8 @@ mut: // atomic
write_sub_mtx u16
read_sub_mtx u16
closed u16
pub:
cap u32 // queue length in #objects
}
pub fn new_channel<T>(n u32) &Channel {
@ -116,7 +117,7 @@ pub fn new_channel<T>(n u32) &Channel {
writesem_im: new_semaphore()
readsem_im: new_semaphore()
objsize: st
queue_length: n
cap: n
write_free: n
read_avail: 0
ringbuf: if n > 0 { malloc(int(n * st)) } else { byteptr(0) }
@ -152,6 +153,11 @@ pub fn (mut ch Channel) close() {
C.atomic_store_u16(&ch.write_sub_mtx, u16(0))
}
[inline]
pub fn (mut ch Channel) len() int {
return int(C.atomic_load_u32(&ch.read_avail))
}
[inline]
pub fn (mut ch Channel) push(src voidptr) {
if ch.try_push_priv(src, false) == .closed {
@ -188,7 +194,7 @@ fn (mut ch Channel) try_push_priv(src voidptr, no_block bool) TransactionState {
return .success
}
}
if no_block && ch.queue_length == 0 {
if no_block && ch.cap == 0 {
return .not_ready
}
// get token to read
@ -201,7 +207,7 @@ fn (mut ch Channel) try_push_priv(src voidptr, no_block bool) TransactionState {
if !got_sem {
ch.writesem.wait()
}
if ch.queue_length == 0 {
if ch.cap == 0 {
// try to advertise current object as readable
mut read_in_progress := false
C.atomic_store_ptr(&ch.read_adr, src)
@ -275,8 +281,8 @@ fn (mut ch Channel) try_push_priv(src voidptr, no_block bool) TransactionState {
mut wr_idx := C.atomic_load_u32(&ch.buf_elem_write_idx)
for {
mut new_wr_idx := wr_idx + 1
for new_wr_idx >= ch.queue_length {
new_wr_idx -= ch.queue_length
for new_wr_idx >= ch.cap {
new_wr_idx -= ch.cap
}
if C.atomic_compare_exchange_strong_u32(&ch.buf_elem_write_idx, &wr_idx, new_wr_idx) {
break
@ -335,7 +341,7 @@ fn (mut ch Channel) try_pop_priv(dest voidptr, no_block bool) TransactionState {
mut write_in_progress := false
for {
mut got_sem := false
if ch.queue_length == 0 {
if ch.cap == 0 {
// unbuffered channel - first see if a `push()` has adversized
mut rdadr := C.atomic_load_ptr(&ch.read_adr)
for rdadr != C.NULL {
@ -367,7 +373,7 @@ fn (mut ch Channel) try_pop_priv(dest voidptr, no_block bool) TransactionState {
}
ch.readsem.wait()
}
if ch.queue_length > 0 {
if ch.cap > 0 {
// try to get buffer token
mut obj_in_queue := false
mut rd_avail := C.atomic_load_u32(&ch.read_avail)
@ -381,8 +387,8 @@ fn (mut ch Channel) try_pop_priv(dest voidptr, no_block bool) TransactionState {
mut rd_idx := C.atomic_load_u32(&ch.buf_elem_read_idx)
for {
mut new_rd_idx := rd_idx + 1
for new_rd_idx >= ch.queue_length {
new_rd_idx -= ch.queue_length
for new_rd_idx >= ch.cap {
new_rd_idx -= ch.cap
}
if C.atomic_compare_exchange_weak_u32(&ch.buf_elem_read_idx, &rd_idx, new_rd_idx) {
break
@ -419,7 +425,7 @@ fn (mut ch Channel) try_pop_priv(dest voidptr, no_block bool) TransactionState {
}
// try to advertise `dest` as writable
C.atomic_store_ptr(&ch.write_adr, dest)
if ch.queue_length == 0 {
if ch.cap == 0 {
mut rdadr := C.atomic_load_ptr(&ch.read_adr)
if rdadr != C.NULL {
mut dest2 := dest
@ -431,7 +437,7 @@ fn (mut ch Channel) try_pop_priv(dest voidptr, no_block bool) TransactionState {
}
}
}
if ch.queue_length == 0 && !write_in_progress {
if ch.cap == 0 && !write_in_progress {
mut null16 := u16(0)
for !C.atomic_compare_exchange_weak_u16(&ch.write_sub_mtx, &null16, u16(1)) {
null16 = u16(0)