From e328b1d2925b511224823d652c92e748bd5a235c Mon Sep 17 00:00:00 2001 From: Miccah Date: Mon, 14 Jun 2021 20:44:31 -0500 Subject: [PATCH] os: create epoll wrapper (#10404) --- vlib/os/notify/backend_default.c.v | 6 + vlib/os/notify/backend_linux.c.v | 200 +++++++++++++++++++++++++++++ vlib/os/notify/notify.v | 35 +++++ vlib/os/notify/notify_test.v | 156 ++++++++++++++++++++++ 4 files changed, 397 insertions(+) create mode 100644 vlib/os/notify/backend_default.c.v create mode 100644 vlib/os/notify/backend_linux.c.v create mode 100644 vlib/os/notify/notify.v create mode 100644 vlib/os/notify/notify_test.v diff --git a/vlib/os/notify/backend_default.c.v b/vlib/os/notify/backend_default.c.v new file mode 100644 index 0000000000..1a35c5087e --- /dev/null +++ b/vlib/os/notify/backend_default.c.v @@ -0,0 +1,6 @@ +module notify + +// Implement the API +pub fn new() ?FdNotifier { + panic('unsupported') +} diff --git a/vlib/os/notify/backend_linux.c.v b/vlib/os/notify/backend_linux.c.v new file mode 100644 index 0000000000..35847d848f --- /dev/null +++ b/vlib/os/notify/backend_linux.c.v @@ -0,0 +1,200 @@ +module notify + +import time +import os + +#include + +struct C.epoll_event { + events u32 + data C.epoll_data_t +} + +[typedef] +union C.epoll_data_t { + ptr voidptr + fd int + u32 u32 + u64 u64 +} + +fn C.epoll_create1(int) int + +fn C.epoll_ctl(int, int, int, &C.epoll_event) int + +fn C.epoll_wait(int, &C.epoll_event, int, int) int + +[noinit] +struct EpollNotifier { + epoll_fd int +mut: + num_watching int // heuristic + events []C.epoll_event +} + +[noinit] +struct EpollEvent { +pub: + fd int + kind FdEventType +} + +pub fn new() ?FdNotifier { + fd := C.epoll_create1(0) // 0 indicates default behavior + if fd == -1 { + return error(os.posix_get_error_msg(C.errno)) + } + // Needed to circumvent V limitations + x := &EpollNotifier{ + epoll_fd: fd + } + return x +} + +const ( + epoll_read = u32(C.EPOLLIN) + epoll_write = u32(C.EPOLLOUT) + epoll_peer_hangup = u32(C.EPOLLRDHUP) + epoll_exception = u32(C.EPOLLPRI) + epoll_error = u32(C.EPOLLERR) + epoll_hangup = u32(C.EPOLLHUP) + epoll_edge_trigger = u32(C.EPOLLET) + epoll_one_shot = u32(C.EPOLLONESHOT) + epoll_wake_up = u32(C.EPOLLWAKEUP) + epoll_exclusive = u32(C.EPOLLEXCLUSIVE) +) + +fn (mut en EpollNotifier) ctl(fd int, op int, mask u32) ? { + event := C.epoll_event{ + events: mask + data: C.epoll_data_t{ + fd: fd + } + } + if C.epoll_ctl(en.epoll_fd, op, fd, &event) == -1 { + return error(os.posix_get_error_msg(C.errno)) + } +} + +fn (mut en EpollNotifier) add(fd int, events FdEventType, conf ...FdConfigFlags) ? { + mask := flags_to_mask(events, ...conf) + en.ctl(fd, C.EPOLL_CTL_ADD, mask) ? + en.num_watching++ +} + +fn (mut en EpollNotifier) modify(fd int, events FdEventType, conf ...FdConfigFlags) ? { + mask := flags_to_mask(events, ...conf) + en.ctl(fd, C.EPOLL_CTL_MOD, mask) ? +} + +fn (mut en EpollNotifier) remove(fd int) ? { + en.ctl(fd, C.EPOLL_CTL_DEL, 0) ? + if en.num_watching > 0 { + en.num_watching-- + } +} + +[direct_array_access] +fn (mut en EpollNotifier) wait(timeout time.Duration) []FdEvent { + if en.events.cap < en.num_watching { + en.events.grow_cap(en.num_watching - en.events.cap) + } + // populate en.events.data with the new events + to := match timeout { + time.infinite { -1 } + else { int(timeout / time.millisecond) } + } + count := C.epoll_wait(en.epoll_fd, en.events.data, en.events.cap, to) + + // set len to count + en.events.clear() + unsafe { en.events.grow_len(count) } + + if count > 0 { + mut arr := []FdEvent{cap: count} + for i := 0; i < count; i++ { + fd := unsafe { en.events[i].data.fd } + kind := event_mask_to_flag(en.events[i].events) + if kind.is_empty() { + // NOTE: tcc only reports the first event for some + // reason, leaving subsequent structs in the array as 0 + // (or possibly garbage) + panic('encountered an empty event kind; this is most likely due to using tcc') + } + arr << &EpollEvent{ + fd: fd + kind: kind + } + } + return arr + } + return [] +} + +fn (mut en EpollNotifier) close() ? { + if C.close(en.epoll_fd) == -1 { + return error(os.posix_get_error_msg(C.errno)) + } +} + +fn event_mask_to_flag(mask u32) FdEventType { + mut flags := FdEventType{} + + if mask & notify.epoll_read != 0 { + flags.set(.read) + } + if mask & notify.epoll_write != 0 { + flags.set(.write) + } + if mask & notify.epoll_peer_hangup != 0 { + flags.set(.peer_hangup) + } + if mask & notify.epoll_exception != 0 { + flags.set(.exception) + } + if mask & notify.epoll_error != 0 { + flags.set(.error) + } + if mask & notify.epoll_hangup != 0 { + flags.set(.hangup) + } + + return flags +} + +fn flags_to_mask(events FdEventType, confs ...FdConfigFlags) u32 { + mut mask := u32(0) + if events.has(.read) { + mask |= notify.epoll_read + } + if events.has(.write) { + mask |= notify.epoll_write + } + if events.has(.peer_hangup) { + mask |= notify.epoll_peer_hangup + } + if events.has(.exception) { + mask |= notify.epoll_exception + } + if events.has(.error) { + mask |= notify.epoll_error + } + if events.has(.hangup) { + mask |= notify.epoll_hangup + } + for conf in confs { + if conf.has(.edge_trigger) { + mask |= notify.epoll_edge_trigger + } + if conf.has(.one_shot) { + mask |= notify.epoll_one_shot + } + if conf.has(.wake_up) { + mask |= notify.epoll_wake_up + } + if conf.has(.exclusive) { + mask |= notify.epoll_exclusive + } + } + return mask +} diff --git a/vlib/os/notify/notify.v b/vlib/os/notify/notify.v new file mode 100644 index 0000000000..b49dad3fdb --- /dev/null +++ b/vlib/os/notify/notify.v @@ -0,0 +1,35 @@ +module notify + +import time + +// Backends should provide a `new() ?FdNotifier` function +pub interface FdNotifier { + add(fd int, events FdEventType, conf ...FdConfigFlags) ? + modify(fd int, events FdEventType, conf ...FdConfigFlags) ? + remove(fd int) ? + wait(timeout time.Duration) []FdEvent + close() ? +} + +pub interface FdEvent { + fd int + kind FdEventType +} + +[flag] +pub enum FdEventType { + read + write + peer_hangup + exception + error + hangup +} + +[flag] +pub enum FdConfigFlags { + edge_trigger + one_shot + wake_up + exclusive +} diff --git a/vlib/os/notify/notify_test.v b/vlib/os/notify/notify_test.v new file mode 100644 index 0000000000..c2cddaaecc --- /dev/null +++ b/vlib/os/notify/notify_test.v @@ -0,0 +1,156 @@ +import os +import os.notify +import time + +// make a pipe and return the (read, write) file descriptors +fn make_pipe() ?(int, int) { + $if linux { + pipefd := [2]int{} + if C.pipe(&pipefd[0]) != 0 { + return error('error $C.errno: ' + os.posix_get_error_msg(C.errno)) + } + return pipefd[0], pipefd[1] + } + return -1, -1 +} + +fn test_level_trigger() ? { + // currently only linux is supported + $if linux { + mut notifier := notify.new() ? + reader, writer := make_pipe() ? + defer { + os.fd_close(reader) + os.fd_close(writer) + notifier.close() or {} + } + notifier.add(reader, .read) ? + + os.fd_write(writer, 'foobar') + check_read_event(notifier, reader, 'foo') + check_read_event(notifier, reader, 'bar') + + assert notifier.wait(0).len == 0 + } +} + +fn test_edge_trigger() ? { + // currently only linux is supported + $if linux { + mut notifier := notify.new() ? + reader, writer := make_pipe() ? + defer { + os.fd_close(reader) + os.fd_close(writer) + notifier.close() or {} + } + notifier.add(reader, .read, .edge_trigger) ? + + os.fd_write(writer, 'foobar') + check_read_event(notifier, reader, 'foo') + + assert notifier.wait(0).len == 0 + + os.fd_write(writer, 'baz') + // we do not get an event because there is still data + // to be read + assert notifier.wait(0).len == 0 + } +} + +fn test_one_shot() ? { + $if linux { + mut notifier := notify.new() ? + reader, writer := make_pipe() ? + defer { + os.fd_close(reader) + os.fd_close(writer) + notifier.close() or {} + } + notifier.add(reader, .read, .one_shot) ? + + os.fd_write(writer, 'foobar') + check_read_event(notifier, reader, 'foo') + os.fd_write(writer, 'baz') + + assert notifier.wait(0).len == 0 + + // rearm + notifier.modify(reader, .read) ? + check_read_event(notifier, reader, 'barbaz') + } +} + +fn test_hangup() ? { + $if linux { + mut notifier := notify.new() ? + reader, writer := make_pipe() ? + defer { + os.fd_close(reader) + notifier.close() or {} + } + notifier.add(reader, .hangup) ? + + assert notifier.wait(0).len == 0 + + // closing on the writer end of the pipe will + // cause a hangup on the reader end + os.fd_close(writer) + events := notifier.wait(0) + assert events.len == 1 + assert events[0].fd == reader + assert events[0].kind.has(.hangup) + } +} + +fn test_write() ? { + $if linux { + mut notifier := notify.new() ? + reader, writer := make_pipe() ? + defer { + os.fd_close(reader) + os.fd_close(writer) + notifier.close() or {} + } + + notifier.add(reader, .write) ? + assert notifier.wait(0).len == 0 + + notifier.add(writer, .write) ? + events := notifier.wait(0) + assert events.len == 1 + assert events[0].fd == writer + assert events[0].kind.has(.write) + } +} + +fn test_remove() ? { + $if linux { + mut notifier := notify.new() ? + reader, writer := make_pipe() ? + defer { + os.fd_close(reader) + os.fd_close(writer) + notifier.close() or {} + } + + // level triggered - will keep getting events while + // there is data to read + notifier.add(reader, .read) ? + os.fd_write(writer, 'foobar') + assert notifier.wait(0).len == 1 + assert notifier.wait(0).len == 1 + + notifier.remove(reader) ? + assert notifier.wait(0).len == 0 + } +} + +fn check_read_event(notifier notify.FdNotifier, reader_fd int, expected string) { + events := notifier.wait(0) + assert events.len == 1 + assert events[0].fd == reader_fd + assert events[0].kind.has(.read) + s, _ := os.fd_read(events[0].fd, expected.len) + assert s == expected +}