builtin: add methods to builtin channels (#6303)
parent
246fe3bfb7
commit
b015033c53
|
@ -0,0 +1,20 @@
|
||||||
|
module builtin
|
||||||
|
|
||||||
|
enum ChanState {
|
||||||
|
success
|
||||||
|
not_ready // push()/pop() would have to wait, but no_block was requested
|
||||||
|
closed
|
||||||
|
}
|
||||||
|
|
||||||
|
// The following methods are only stubs. The real implementation
|
||||||
|
// is in `vlib/sync/channels.v`
|
||||||
|
|
||||||
|
pub fn (ch chan) close() {}
|
||||||
|
|
||||||
|
pub fn (ch chan) try_pop(obj voidptr) ChanState {
|
||||||
|
return .success
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn (ch chan) try_push(obj voidptr) ChanState {
|
||||||
|
return .success
|
||||||
|
}
|
|
@ -1,5 +1,3 @@
|
||||||
import sync
|
|
||||||
|
|
||||||
const (
|
const (
|
||||||
num_iterations = 10000
|
num_iterations = 10000
|
||||||
)
|
)
|
||||||
|
@ -34,6 +32,6 @@ fn test_channel_array_mut() {
|
||||||
chs[0] <- t
|
chs[0] <- t
|
||||||
t = <-chs[1]
|
t = <-chs[1]
|
||||||
}
|
}
|
||||||
(&sync.Channel(chs[0])).close()
|
chs[0].close()
|
||||||
assert t.n == 100 + num_iterations
|
assert t.n == 100 + num_iterations
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,53 @@
|
||||||
|
// Channel Benchmark
|
||||||
|
//
|
||||||
|
// `nobj` integers are sent thru a channel with queue length`buflen`
|
||||||
|
// using `nsend` sender threads and `nrec` receiver threads.
|
||||||
|
//
|
||||||
|
// The receive threads add all received numbers and send them to the
|
||||||
|
// main thread where the total sum is compare to the expected value.
|
||||||
|
|
||||||
|
const (
|
||||||
|
nsend = 2
|
||||||
|
nrec = 2
|
||||||
|
buflen = 100
|
||||||
|
nobj = 10000
|
||||||
|
objs_per_thread = 5000
|
||||||
|
)
|
||||||
|
|
||||||
|
fn do_rec(ch chan int, resch chan i64, n int) {
|
||||||
|
mut sum := i64(0)
|
||||||
|
for _ in 0 .. n {
|
||||||
|
mut r := 0
|
||||||
|
for ch.try_pop(mut r) != .success {}
|
||||||
|
sum += r
|
||||||
|
}
|
||||||
|
println(sum)
|
||||||
|
resch <- sum
|
||||||
|
}
|
||||||
|
|
||||||
|
fn do_send(ch chan int, start, end int) {
|
||||||
|
for i in start .. end {
|
||||||
|
for ch.try_push(i) != .success {}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn test_channel_polling() {
|
||||||
|
ch := chan int{cap: buflen}
|
||||||
|
resch := chan i64{}
|
||||||
|
for i in 0 .. nrec {
|
||||||
|
go do_rec(ch, resch, objs_per_thread)
|
||||||
|
}
|
||||||
|
mut n := nobj
|
||||||
|
for i in 0 .. nsend {
|
||||||
|
end := n
|
||||||
|
n -= objs_per_thread
|
||||||
|
go do_send(ch, n, end)
|
||||||
|
}
|
||||||
|
mut sum := i64(0)
|
||||||
|
for _ in 0 .. nrec {
|
||||||
|
sum += <-resch
|
||||||
|
}
|
||||||
|
// use sum formula by Gauß to calculate the expected result
|
||||||
|
expected_sum := i64(nobj)*(nobj-1)/2
|
||||||
|
assert sum == expected_sum
|
||||||
|
}
|
|
@ -76,12 +76,6 @@ enum Direction {
|
||||||
push
|
push
|
||||||
}
|
}
|
||||||
|
|
||||||
enum TransactionState {
|
|
||||||
success
|
|
||||||
not_ready // push()/pop() would have to wait, but no_block was requested
|
|
||||||
closed
|
|
||||||
}
|
|
||||||
|
|
||||||
struct Channel {
|
struct Channel {
|
||||||
writesem Semaphore // to wake thread that wanted to write, but buffer was full
|
writesem Semaphore // to wake thread that wanted to write, but buffer was full
|
||||||
readsem Semaphore // to wake thread that wanted to read, but buffer was empty
|
readsem Semaphore // to wake thread that wanted to read, but buffer was empty
|
||||||
|
@ -170,11 +164,11 @@ pub fn (mut ch Channel) push(src voidptr) {
|
||||||
}
|
}
|
||||||
|
|
||||||
[inline]
|
[inline]
|
||||||
pub fn (mut ch Channel) try_push(src voidptr) TransactionState {
|
pub fn (mut ch Channel) try_push(src voidptr) ChanState {
|
||||||
return ch.try_push_priv(src, false)
|
return ch.try_push_priv(src, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn (mut ch Channel) try_push_priv(src voidptr, no_block bool) TransactionState {
|
fn (mut ch Channel) try_push_priv(src voidptr, no_block bool) ChanState {
|
||||||
if C.atomic_load_u16(&ch.closed) != 0 {
|
if C.atomic_load_u16(&ch.closed) != 0 {
|
||||||
return .closed
|
return .closed
|
||||||
}
|
}
|
||||||
|
@ -329,11 +323,11 @@ pub fn (mut ch Channel) pop(dest voidptr) bool {
|
||||||
}
|
}
|
||||||
|
|
||||||
[inline]
|
[inline]
|
||||||
pub fn (mut ch Channel) try_pop(dest voidptr) TransactionState {
|
pub fn (mut ch Channel) try_pop(dest voidptr) ChanState {
|
||||||
return ch.try_pop_priv(dest, false)
|
return ch.try_pop_priv(dest, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn (mut ch Channel) try_pop_priv(dest voidptr, no_block bool) TransactionState {
|
fn (mut ch Channel) try_pop_priv(dest voidptr, no_block bool) ChanState {
|
||||||
spinloops_sem_, spinloops_ := if no_block { spinloops, spinloops_sem } else { 1, 1 }
|
spinloops_sem_, spinloops_ := if no_block { spinloops, spinloops_sem } else { 1, 1 }
|
||||||
mut have_swapped := false
|
mut have_swapped := false
|
||||||
mut write_in_progress := false
|
mut write_in_progress := false
|
||||||
|
@ -355,7 +349,11 @@ fn (mut ch Channel) try_pop_priv(dest voidptr, no_block bool) TransactionState {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if no_block {
|
if no_block {
|
||||||
return if C.atomic_load_u16(&ch.closed) == 0 { TransactionState.not_ready } else { TransactionState.closed }
|
if C.atomic_load_u16(&ch.closed) == 0 {
|
||||||
|
return .not_ready
|
||||||
|
} else {
|
||||||
|
return .closed
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// get token to read
|
// get token to read
|
||||||
|
@ -367,7 +365,11 @@ fn (mut ch Channel) try_pop_priv(dest voidptr, no_block bool) TransactionState {
|
||||||
}
|
}
|
||||||
if !got_sem {
|
if !got_sem {
|
||||||
if no_block {
|
if no_block {
|
||||||
return if C.atomic_load_u16(&ch.closed) == 0 { TransactionState.not_ready } else { TransactionState.closed }
|
if C.atomic_load_u16(&ch.closed) == 0 {
|
||||||
|
return .not_ready
|
||||||
|
} else {
|
||||||
|
return .closed
|
||||||
|
}
|
||||||
}
|
}
|
||||||
ch.readsem.wait()
|
ch.readsem.wait()
|
||||||
}
|
}
|
||||||
|
|
|
@ -391,6 +391,11 @@ fn (mut g Gen) method_call(node ast.CallExpr) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
mut name := util.no_dots('${receiver_type_name}_$node.name')
|
mut name := util.no_dots('${receiver_type_name}_$node.name')
|
||||||
|
if left_sym.kind == .chan {
|
||||||
|
if node.name in ['close', 'try_pop', 'try_push'] {
|
||||||
|
name = 'sync__Channel_$node.name'
|
||||||
|
}
|
||||||
|
}
|
||||||
// Check if expression is: arr[a..b].clone(), arr[a..].clone()
|
// Check if expression is: arr[a..b].clone(), arr[a..].clone()
|
||||||
// if so, then instead of calling array_clone(&array_slice(...))
|
// if so, then instead of calling array_clone(&array_slice(...))
|
||||||
// call array_clone_static(array_slice(...))
|
// call array_clone_static(array_slice(...))
|
||||||
|
|
Loading…
Reference in New Issue