From 433610b5c01c1c9d13cbd8526cfad85e3ced515d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uwe=20Kr=C3=BCger?= <45282134+UweKrueger@users.noreply.github.com> Date: Tue, 11 Aug 2020 17:52:13 +0200 Subject: [PATCH] sync/channels: provide `.cap` and `.len()` (#6104) --- doc/upcoming.md | 9 +++++++++ vlib/sync/channel_fill_test.v | 22 ++++++++++++++++++++++ vlib/sync/channels.v | 30 ++++++++++++++++++------------ 3 files changed, 49 insertions(+), 12 deletions(-) create mode 100644 vlib/sync/channel_fill_test.v diff --git a/doc/upcoming.md b/doc/upcoming.md index ff881d1a21..c54117bb0c 100644 --- a/doc/upcoming.md +++ b/doc/upcoming.md @@ -250,6 +250,15 @@ if ch.pop(&m) { } ``` +There are also methods `try_push()` and `try_pop()` which return immediatelly with the return value `.not_ready` if the transaction +cannot be performed without waiting. The return value is of type `sync.TransactionState` which can also be +`.success` or `.closed`. + +To monitor a channel there is a method `len()` which returns the number of elements currently in the queue and the attribute +`cap` for the queue length. Please be aware that in general `channel.len() > 0` does not guarantee that the next +`pop()` will succeed without waiting, since other threads may already have "stolen" elements from the queue. Use `try_pop()` to +accomplish this kind of task. + The select call is somewhat tricky. The `channel_select()` function needs three arrays that contain the channels, the directions (pop/push) and the object references and a timeout of type `time.Duration` (`time.infinite` or `-1` to wait unlimited) as parameters. It returns the diff --git a/vlib/sync/channel_fill_test.v b/vlib/sync/channel_fill_test.v new file mode 100644 index 0000000000..7252dfe0ab --- /dev/null +++ b/vlib/sync/channel_fill_test.v @@ -0,0 +1,22 @@ +import sync + +const ( + queue_len = 1000 + queue_fill = 763 +) + +fn do_send(mut ch sync.Channel, fin sync.Semaphore) { + for i in 0 .. queue_fill { + ch.push(&i) + } + fin.post() +} + +fn test_channel_len_cap() { + mut ch := sync.new_channel(queue_len) + sem := sync.new_semaphore() + go do_send(mut ch, sem) + sem.wait() + assert ch.cap == queue_len + assert ch.len() == queue_fill +} diff --git a/vlib/sync/channels.v b/vlib/sync/channels.v index a3f6910771..e9d5bf19ad 100644 --- a/vlib/sync/channels.v +++ b/vlib/sync/channels.v @@ -90,7 +90,6 @@ struct Channel { ringbuf byteptr // queue for buffered channels statusbuf byteptr // flags to synchronize write/read in ringbuf objsize u32 - queue_length u32 // in #objects mut: // atomic write_adr C.atomic_uintptr_t // if != NULL the next obj can be written here without wait read_adr C.atomic_uintptr_t // if != NULL an obj can be read from here without wait @@ -106,6 +105,8 @@ mut: // atomic write_sub_mtx u16 read_sub_mtx u16 closed u16 +pub: + cap u32 // queue length in #objects } pub fn new_channel(n u32) &Channel { @@ -116,7 +117,7 @@ pub fn new_channel(n u32) &Channel { writesem_im: new_semaphore() readsem_im: new_semaphore() objsize: st - queue_length: n + cap: n write_free: n read_avail: 0 ringbuf: if n > 0 { malloc(int(n * st)) } else { byteptr(0) } @@ -152,6 +153,11 @@ pub fn (mut ch Channel) close() { C.atomic_store_u16(&ch.write_sub_mtx, u16(0)) } +[inline] +pub fn (mut ch Channel) len() int { + return int(C.atomic_load_u32(&ch.read_avail)) +} + [inline] pub fn (mut ch Channel) push(src voidptr) { if ch.try_push_priv(src, false) == .closed { @@ -188,7 +194,7 @@ fn (mut ch Channel) try_push_priv(src voidptr, no_block bool) TransactionState { return .success } } - if no_block && ch.queue_length == 0 { + if no_block && ch.cap == 0 { return .not_ready } // get token to read @@ -201,7 +207,7 @@ fn (mut ch Channel) try_push_priv(src voidptr, no_block bool) TransactionState { if !got_sem { ch.writesem.wait() } - if ch.queue_length == 0 { + if ch.cap == 0 { // try to advertise current object as readable mut read_in_progress := false C.atomic_store_ptr(&ch.read_adr, src) @@ -275,8 +281,8 @@ fn (mut ch Channel) try_push_priv(src voidptr, no_block bool) TransactionState { mut wr_idx := C.atomic_load_u32(&ch.buf_elem_write_idx) for { mut new_wr_idx := wr_idx + 1 - for new_wr_idx >= ch.queue_length { - new_wr_idx -= ch.queue_length + for new_wr_idx >= ch.cap { + new_wr_idx -= ch.cap } if C.atomic_compare_exchange_strong_u32(&ch.buf_elem_write_idx, &wr_idx, new_wr_idx) { break @@ -335,7 +341,7 @@ fn (mut ch Channel) try_pop_priv(dest voidptr, no_block bool) TransactionState { mut write_in_progress := false for { mut got_sem := false - if ch.queue_length == 0 { + if ch.cap == 0 { // unbuffered channel - first see if a `push()` has adversized mut rdadr := C.atomic_load_ptr(&ch.read_adr) for rdadr != C.NULL { @@ -367,7 +373,7 @@ fn (mut ch Channel) try_pop_priv(dest voidptr, no_block bool) TransactionState { } ch.readsem.wait() } - if ch.queue_length > 0 { + if ch.cap > 0 { // try to get buffer token mut obj_in_queue := false mut rd_avail := C.atomic_load_u32(&ch.read_avail) @@ -381,8 +387,8 @@ fn (mut ch Channel) try_pop_priv(dest voidptr, no_block bool) TransactionState { mut rd_idx := C.atomic_load_u32(&ch.buf_elem_read_idx) for { mut new_rd_idx := rd_idx + 1 - for new_rd_idx >= ch.queue_length { - new_rd_idx -= ch.queue_length + for new_rd_idx >= ch.cap { + new_rd_idx -= ch.cap } if C.atomic_compare_exchange_weak_u32(&ch.buf_elem_read_idx, &rd_idx, new_rd_idx) { break @@ -419,7 +425,7 @@ fn (mut ch Channel) try_pop_priv(dest voidptr, no_block bool) TransactionState { } // try to advertise `dest` as writable C.atomic_store_ptr(&ch.write_adr, dest) - if ch.queue_length == 0 { + if ch.cap == 0 { mut rdadr := C.atomic_load_ptr(&ch.read_adr) if rdadr != C.NULL { mut dest2 := dest @@ -431,7 +437,7 @@ fn (mut ch Channel) try_pop_priv(dest voidptr, no_block bool) TransactionState { } } } - if ch.queue_length == 0 && !write_in_progress { + if ch.cap == 0 && !write_in_progress { mut null16 := u16(0) for !C.atomic_compare_exchange_weak_u16(&ch.write_sub_mtx, &null16, u16(1)) { null16 = u16(0)