From bcda0eeadc278ddbccca9b1c85a6142caeee2465 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Uwe=20Kr=C3=BCger?= <45282134+UweKrueger@users.noreply.github.com> Date: Fri, 18 Dec 2020 23:02:29 +0100 Subject: [PATCH] sync: fix rare freeze on `select` (#7398) --- vlib/sync/channel_select_4_test.v | 43 +++++++++++++++++++++ vlib/sync/channel_select_5_test.v | 62 +++++++++++++++++++++++++++++++ vlib/sync/channels.v | 36 ++++++++---------- 3 files changed, 121 insertions(+), 20 deletions(-) create mode 100644 vlib/sync/channel_select_4_test.v create mode 100644 vlib/sync/channel_select_5_test.v diff --git a/vlib/sync/channel_select_4_test.v b/vlib/sync/channel_select_4_test.v new file mode 100644 index 0000000000..646e8ac7c6 --- /dev/null +++ b/vlib/sync/channel_select_4_test.v @@ -0,0 +1,43 @@ +fn do_rec_i64(ch chan i64, sumch chan i64) { + mut sum := i64(0) + for _ in 0 .. 30000 { + sum += <-ch + } + sumch <- sum +} + +fn do_send_int(ch chan int) { + for i in 0 .. 30000 { + ch <- i + } +} + +fn test_select() { + chi := chan int{cap: 10} + recch := chan i64{cap: 10} + chsum := chan i64{} + go do_rec_i64(recch, chsum) + go do_send_int(chi) + mut sum := i64(0) + mut sl := i64(0) + for _ in 0 .. 60000 + recch.cap { + select { + ri := <-chi { + sum += ri + } + recch <- sl { + sl++ + } + } + } + // Use Gauß' formula + expected_sum := i64(30000) * (30000 - 1) / 2 + assert sum == expected_sum + + mut sumrec := <- chsum + // Empty receive buffer + for _ in 0 .. recch.cap { + sumrec += <- recch + } + assert sumrec == i64(30000 + recch.cap) * (30000 + recch.cap - 1) / 2 +} diff --git a/vlib/sync/channel_select_5_test.v b/vlib/sync/channel_select_5_test.v new file mode 100644 index 0000000000..d3aa0d4709 --- /dev/null +++ b/vlib/sync/channel_select_5_test.v @@ -0,0 +1,62 @@ +fn do_rec_i64(ch chan i64, sumch chan i64) { + mut sum := i64(0) + for _ in 0 .. 10000 { + sum += <-ch + } + sumch <- sum +} + +fn do_send_int(ch chan int) { + for i in 0 .. 10000 { + ch <- i + } +} + +fn do_send_int2(ch chan int) { + for i in 10000 .. 20000 { + ch <- i + } +} + +fn do_send_int3(ch chan int) { + for i in 20000 .. 30000 { + ch <- i + } +} + +fn test_select() { + chi := chan int{cap: 10} + recch := chan i64{cap: 10} + chsum := chan i64{} + go do_rec_i64(recch, chsum) + go do_rec_i64(recch, chsum) + go do_rec_i64(recch, chsum) + go do_send_int(chi) + go do_send_int2(chi) + go do_send_int3(chi) + mut sum := i64(0) + mut rl := i64(0) + mut sl := i64(0) + for _ in 0 .. 60000 + recch.cap { + select { + ri := <-chi { + sum += ri + } + recch <- sl { + sl++ + } + } + } + // Use Gauß' formula + expected_sum := i64(30000) * (30000 - 1) / 2 + assert sum == expected_sum + + mut sumrec := <- chsum + sumrec += <- chsum + sumrec += <- chsum + // Empty receive buffer + for _ in 0 .. recch.cap { + sumrec += <- recch + } + assert sumrec == i64(30000 + recch.cap) * (30000 + recch.cap - 1) / 2 +} diff --git a/vlib/sync/channels.v b/vlib/sync/channels.v index fcf5438b90..c7dbf90005 100644 --- a/vlib/sync/channels.v +++ b/vlib/sync/channels.v @@ -304,18 +304,16 @@ fn (mut ch Channel) try_push_priv(src voidptr, no_block bool) ChanState { C.memcpy(wr_ptr, src, ch.objsize) } C.atomic_store_u16(status_adr, u16(BufferElemStat.written)) - old_read_avail := C.atomic_fetch_add_u32(&ch.read_avail, 1) + C.atomic_fetch_add_u32(&ch.read_avail, 1) ch.readsem.post() - if old_read_avail == 0 { - mut null16 := u16(0) - for !C.atomic_compare_exchange_weak_u16(&ch.read_sub_mtx, &null16, u16(1)) { - null16 = u16(0) - } - if ch.read_subscriber != voidptr(0) { - ch.read_subscriber.sem.post() - } - C.atomic_store_u16(&ch.read_sub_mtx, u16(0)) + mut null16 := u16(0) + for !C.atomic_compare_exchange_weak_u16(&ch.read_sub_mtx, &null16, u16(1)) { + null16 = u16(0) } + if ch.read_subscriber != voidptr(0) { + ch.read_subscriber.sem.post() + } + C.atomic_store_u16(&ch.read_sub_mtx, u16(0)) return .success } else { if no_block { @@ -421,18 +419,16 @@ fn (mut ch Channel) try_pop_priv(dest voidptr, no_block bool) ChanState { C.memcpy(dest, rd_ptr, ch.objsize) } C.atomic_store_u16(status_adr, u16(BufferElemStat.unused)) - old_write_free := C.atomic_fetch_add_u32(&ch.write_free, 1) + C.atomic_fetch_add_u32(&ch.write_free, 1) ch.writesem.post() - if old_write_free == 0 { - mut null16 := u16(0) - for !C.atomic_compare_exchange_weak_u16(&ch.write_sub_mtx, &null16, u16(1)) { - null16 = u16(0) - } - if ch.write_subscriber != voidptr(0) { - ch.write_subscriber.sem.post() - } - C.atomic_store_u16(&ch.write_sub_mtx, u16(0)) + mut null16 := u16(0) + for !C.atomic_compare_exchange_weak_u16(&ch.write_sub_mtx, &null16, u16(1)) { + null16 = u16(0) } + if ch.write_subscriber != voidptr(0) { + ch.write_subscriber.sem.post() + } + C.atomic_store_u16(&ch.write_sub_mtx, u16(0)) return .success } }