os/notify: make epoll wrapper thread safe (#10473)

pull/10477/head
Miccah 2021-06-15 20:03:33 -05:00 committed by GitHub
parent aaa59ac770
commit a0b7e1a0ca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 28 additions and 19 deletions

View File

@ -24,14 +24,15 @@ fn C.epoll_ctl(int, int, int, &C.epoll_event) int
fn C.epoll_wait(int, &C.epoll_event, int, int) int fn C.epoll_wait(int, &C.epoll_event, int, int) int
// EpollNotifier provides methods that implement FdNotifier using the
// epoll I/O event notification facility (linux only)
[noinit] [noinit]
struct EpollNotifier { struct EpollNotifier {
epoll_fd int epoll_fd int
mut:
num_watching int // heuristic
events []C.epoll_event
} }
// EpollEvent describes an event that occurred for a file descriptor in
// the watch list
[noinit] [noinit]
struct EpollEvent { struct EpollEvent {
pub: pub:
@ -39,6 +40,9 @@ pub:
kind FdEventType kind FdEventType
} }
// new creates a new EpollNotifier
// The FdNotifier interface is returned to allow OS specific
// implementations without exposing the concrete type
pub fn new() ?FdNotifier { pub fn new() ?FdNotifier {
fd := C.epoll_create1(0) // 0 indicates default behavior fd := C.epoll_create1(0) // 0 indicates default behavior
if fd == -1 { if fd == -1 {
@ -64,6 +68,7 @@ const (
epoll_exclusive = u32(C.EPOLLEXCLUSIVE) epoll_exclusive = u32(C.EPOLLEXCLUSIVE)
) )
// ctl is a helper method for add, modify, and remove
fn (mut en EpollNotifier) ctl(fd int, op int, mask u32) ? { fn (mut en EpollNotifier) ctl(fd int, op int, mask u32) ? {
event := C.epoll_event{ event := C.epoll_event{
events: mask events: mask
@ -76,45 +81,43 @@ fn (mut en EpollNotifier) ctl(fd int, op int, mask u32) ? {
} }
} }
// add adds a file descriptor to the watch list
fn (mut en EpollNotifier) add(fd int, events FdEventType, conf ...FdConfigFlags) ? { fn (mut en EpollNotifier) add(fd int, events FdEventType, conf ...FdConfigFlags) ? {
mask := flags_to_mask(events, ...conf) mask := flags_to_mask(events, ...conf)
en.ctl(fd, C.EPOLL_CTL_ADD, mask) ? en.ctl(fd, C.EPOLL_CTL_ADD, mask) ?
en.num_watching++
} }
// modify sets an existing entry in the watch list to the provided events and configuration
fn (mut en EpollNotifier) modify(fd int, events FdEventType, conf ...FdConfigFlags) ? { fn (mut en EpollNotifier) modify(fd int, events FdEventType, conf ...FdConfigFlags) ? {
mask := flags_to_mask(events, ...conf) mask := flags_to_mask(events, ...conf)
en.ctl(fd, C.EPOLL_CTL_MOD, mask) ? en.ctl(fd, C.EPOLL_CTL_MOD, mask) ?
} }
// remove removes a file descriptor from the watch list
fn (mut en EpollNotifier) remove(fd int) ? { fn (mut en EpollNotifier) remove(fd int) ? {
en.ctl(fd, C.EPOLL_CTL_DEL, 0) ? en.ctl(fd, C.EPOLL_CTL_DEL, 0) ?
if en.num_watching > 0 {
en.num_watching--
}
} }
[direct_array_access] // wait waits to be notified of events on the watch list,
// returns at most 512 events
fn (mut en EpollNotifier) wait(timeout time.Duration) []FdEvent { fn (mut en EpollNotifier) wait(timeout time.Duration) []FdEvent {
if en.events.cap < en.num_watching { // arbitrary 512 limit; events will round robin on successive
en.events.grow_cap(en.num_watching - en.events.cap) // waits if the number exceeds this
} // NOTE: we use a fixed size array here for stack allocation; this has
// populate en.events.data with the new events // the added bonus of making EpollNotifier thread safe
events := [512]C.epoll_event{}
// populate events with the new events
to := match timeout { to := match timeout {
time.infinite { -1 } time.infinite { -1 }
else { int(timeout / time.millisecond) } else { int(timeout / time.millisecond) }
} }
count := C.epoll_wait(en.epoll_fd, en.events.data, en.events.cap, to) count := C.epoll_wait(en.epoll_fd, &events[0], events.len, to)
// set len to count
en.events.clear()
unsafe { en.events.grow_len(count) }
if count > 0 { if count > 0 {
mut arr := []FdEvent{cap: count} mut arr := []FdEvent{cap: count}
for i := 0; i < count; i++ { for i := 0; i < count; i++ {
fd := unsafe { en.events[i].data.fd } fd := unsafe { events[i].data.fd }
kind := event_mask_to_flag(en.events[i].events) kind := event_mask_to_flag(events[i].events)
if kind.is_empty() { if kind.is_empty() {
// NOTE: tcc only reports the first event for some // NOTE: tcc only reports the first event for some
// reason, leaving subsequent structs in the array as 0 // reason, leaving subsequent structs in the array as 0
@ -131,12 +134,16 @@ fn (mut en EpollNotifier) wait(timeout time.Duration) []FdEvent {
return [] return []
} }
// close closes the EpollNotifier,
// any successive calls to add, modify, remove, and wait should fail
fn (mut en EpollNotifier) close() ? { fn (mut en EpollNotifier) close() ? {
if C.close(en.epoll_fd) == -1 { if C.close(en.epoll_fd) == -1 {
return error(os.posix_get_error_msg(C.errno)) return error(os.posix_get_error_msg(C.errno))
} }
} }
// event_mask_to_flag is a helper function that converts a bitmask
// returned by epoll_wait to FdEventType
fn event_mask_to_flag(mask u32) FdEventType { fn event_mask_to_flag(mask u32) FdEventType {
mut flags := FdEventType{} mut flags := FdEventType{}
@ -162,6 +169,8 @@ fn event_mask_to_flag(mask u32) FdEventType {
return flags return flags
} }
// flags_to_mask is a helper function that converts FdEventType and
// FdConfigFlags to a bitmask used by the C functions
fn flags_to_mask(events FdEventType, confs ...FdConfigFlags) u32 { fn flags_to_mask(events FdEventType, confs ...FdConfigFlags) u32 {
mut mask := u32(0) mut mask := u32(0)
if events.has(.read) { if events.has(.read) {