picoev: error handling workaround (#9913)

pull/9953/head
Anton Zavodchikov 2021-05-01 16:20:10 +05:00 committed by GitHub
parent f82f1977d1
commit b621595c05
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 117 additions and 90 deletions

View File

@ -2,6 +2,10 @@ import json
import picoev import picoev
import picohttpparser import picohttpparser
const (
port = 8088
)
struct Message { struct Message {
message string message string
} }
@ -43,6 +47,6 @@ fn callback(data voidptr, req picohttpparser.Request, mut res picohttpparser.Res
} }
fn main() { fn main() {
println('Starting webserver on http://127.0.0.1:8088/ ...') println('Starting webserver on http://127.0.0.1:$port/ ...')
picoev.new(8088, &callback).serve() picoev.new(port: port, cb: &callback).serve()
} }

View File

@ -7,63 +7,26 @@ import net
import picohttpparser import picohttpparser
#include <errno.h> #include <errno.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <netinet/tcp.h> #include <netinet/tcp.h>
#include <fcntl.h>
#include <signal.h> #include <signal.h>
#flag -I @VEXEROOT/thirdparty/picoev #flag -I @VEXEROOT/thirdparty/picoev
#flag -L @VEXEROOT/thirdparty/picoev #flag -L @VEXEROOT/thirdparty/picoev
#flag @VEXEROOT/thirdparty/picoev/picoev.o #flag @VEXEROOT/thirdparty/picoev/picoev.o
#include "src/picoev.h" #include "src/picoev.h"
const (
max_fds = 1024
max_timeout = 10
max_read = 4096
max_write = 8192
)
struct C.in_addr {
mut:
s_addr int
}
struct C.sockaddr_in {
mut:
sin_family int
sin_port int
sin_addr C.in_addr
}
struct C.sockaddr_storage {}
fn C.atoi() int fn C.atoi() int
fn C.strncasecmp(s1 charptr, s2 charptr, n size_t) int fn C.strncasecmp(s1 &char, s2 &char, n size_t) int
struct C.picoev_loop {} struct C.picoev_loop {}
struct Picoev {
loop &C.picoev_loop
cb fn(data voidptr, req picohttpparser.Request, mut res picohttpparser.Response)
mut:
date byteptr
buf byteptr
idx [max_fds]int
out byteptr
pub mut:
user_data voidptr = voidptr(0)
timeout_secs int = 8
max_headers int = 100
}
fn C.picoev_del(&C.picoev_loop, int) int fn C.picoev_del(&C.picoev_loop, int) int
fn C.picoev_set_timeout(&C.picoev_loop, int, int) fn C.picoev_set_timeout(&C.picoev_loop, int, int)
// fn C.picoev_handler(loop &C.picoev_loop, fd int, revents int, cb_arg voidptr) // fn C.picoev_handler(loop &C.picoev_loop, fd int, revents int, cb_arg voidptr)
// TODO: (sponge) update to C.picoev_handler with C type def update // TODO: (sponge) update to C.picoev_handler with C type def update
type Cpicoev_handler = fn(loop &C.picoev_loop, fd int, revents int, cb_arg voidptr) type Cpicoev_handler = fn (loop &C.picoev_loop, fd int, revents int, context voidptr)
fn C.picoev_add(&C.picoev_loop, int, int, int, &Cpicoev_handler, voidptr) int fn C.picoev_add(&C.picoev_loop, int, int, int, &Cpicoev_handler, voidptr) int
@ -77,14 +40,54 @@ fn C.picoev_destroy_loop(&C.picoev_loop) int
fn C.picoev_deinit() int fn C.picoev_deinit() int
const (
max_fds = 1024
max_timeout = 10
max_read = 4096
max_write = 8192
)
enum Event {
read = C.PICOEV_READ
write = C.PICOEV_WRITE
timeout = C.PICOEV_TIMEOUT
add = C.PICOEV_ADD
del = C.PICOEV_DEL
readwrite = C.PICOEV_READWRITE
}
pub struct Config {
pub:
port int = 8080
cb fn (voidptr, picohttpparser.Request, mut picohttpparser.Response)
err_cb fn (voidptr, picohttpparser.Request, mut picohttpparser.Response, IError) = default_err_cb
user_data voidptr = voidptr(0)
timeout_secs int = 8
max_headers int = 100
}
struct Picoev {
loop &C.picoev_loop
cb fn (voidptr, picohttpparser.Request, mut picohttpparser.Response)
err_cb fn (voidptr, picohttpparser.Request, mut picohttpparser.Response, IError)
user_data voidptr
timeout_secs int
max_headers int
mut:
date &byte
buf &byte
idx [1024]int
out &byte
}
[inline] [inline]
fn setup_sock(fd int) { fn setup_sock(fd int) ? {
on := 1 flag := 1
if C.setsockopt(fd, C.IPPROTO_TCP, C.TCP_NODELAY, &on, sizeof(int)) < 0 { if C.setsockopt(fd, C.IPPROTO_TCP, C.TCP_NODELAY, &flag, sizeof(int)) < 0 {
println('setup_sock.setup_sock failed') return error('setup_sock.setup_sock failed')
} }
if C.fcntl(fd, C.F_SETFL, C.O_NONBLOCK) != 0 { if C.fcntl(fd, C.F_SETFL, C.O_NONBLOCK) != 0 {
println('fcntl failed') return error('fcntl failed')
} }
} }
@ -95,40 +98,57 @@ fn close_conn(loop &C.picoev_loop, fd int) {
} }
[inline] [inline]
fn req_read(fd int, b byteptr, max_len int, idx int) int { fn req_read(fd int, b &byte, max_len int, idx int) int {
unsafe { unsafe {
return C.read(fd, b + idx, max_len - idx) return C.read(fd, b + idx, max_len - idx)
} }
} }
fn rw_callback(loop &C.picoev_loop, fd int, events int, cb_arg voidptr) { fn rw_callback(loop &C.picoev_loop, fd int, events int, context voidptr) {
mut p := unsafe {&Picoev(cb_arg)} mut p := unsafe { &Picoev(context) }
if (events & C.PICOEV_TIMEOUT) != 0 { defer {
close_conn(loop, fd)
p.idx[fd] = 0 p.idx[fd] = 0
}
if (events & int(Event.timeout)) != 0 {
close_conn(loop, fd)
return return
} else if (events & C.PICOEV_READ) != 0 { } else if (events & int(Event.read)) != 0 {
C.picoev_set_timeout(loop, fd, p.timeout_secs) C.picoev_set_timeout(loop, fd, p.timeout_secs)
// Request init // Request init
mut buf := p.buf mut buf := p.buf
unsafe { unsafe {
buf += fd * max_read /* pointer magic */ buf += fd * picoev.max_read // pointer magic
} }
mut req := picohttpparser.Request{} mut req := picohttpparser.Request{}
for { /* Request parsing loop */ // Response init
r := req_read(fd, buf, max_read, p.idx[fd]) // Get data from socket mut out := p.out
if r == 0 { /* connection closed by peer */ unsafe {
out += fd * picoev.max_write // pointer magic
}
mut res := picohttpparser.Response{
fd: fd
date: p.date
buf_start: out
buf: out
}
for {
// Request parsing loop
r := req_read(fd, buf, picoev.max_read, p.idx[fd]) // Get data from socket
if r == 0 {
// connection closed by peer
close_conn(loop, fd) close_conn(loop, fd)
p.idx[fd] = 0
return return
} else if r == -1 { /* error */ } else if r == -1 {
if C.errno == C.EAGAIN || C.errno == C.EWOULDBLOCK { /* try again later */ // error
if C.errno == C.EAGAIN || C.errno == C.EWOULDBLOCK {
// try again later
return return
} else { /* fatal error */ } else {
// fatal error
close_conn(loop, fd) close_conn(loop, fd)
p.idx[fd] = 0
return return
} }
} }
@ -139,32 +159,20 @@ fn rw_callback(loop &C.picoev_loop, fd int, events int, cb_arg voidptr) {
if pret > 0 { // Success if pret > 0 { // Success
break break
} else if pret == -1 { // Parse error } else if pret == -1 { // Parse error
eprintln("picohttprequest: ParseError") p.err_cb(mut p.user_data, req, mut &res, error('ParseError'))
return return
} }
assert pret == -2 /* request is incomplete, continue the loop */ assert pret == -2
// request is incomplete, continue the loop
if p.idx[fd] == sizeof(buf) { if p.idx[fd] == sizeof(buf) {
eprintln("picohttprequest: RequestIsTooLongError") p.err_cb(mut p.user_data, req, mut &res, error('RequestIsTooLongError'))
return return
} }
} }
// Response init
mut out := p.out
unsafe {
out += fd * max_write /* pointer magic */
}
mut res := picohttpparser.Response{
fd: fd
date: p.date
buf_start: out
buf: out
}
// Callback (should call .end() itself) // Callback (should call .end() itself)
p.cb(mut p.user_data, req, mut &res) p.cb(mut p.user_data, req, mut &res)
p.idx[fd] = 0
} }
} }
@ -172,12 +180,20 @@ fn accept_callback(loop &C.picoev_loop, fd int, events int, cb_arg voidptr) {
mut p := unsafe { &Picoev(cb_arg) } mut p := unsafe { &Picoev(cb_arg) }
newfd := C.accept(fd, 0, 0) newfd := C.accept(fd, 0, 0)
if newfd != -1 { if newfd != -1 {
setup_sock(newfd) setup_sock(newfd) or {
C.picoev_add(loop, newfd, C.PICOEV_READ, p.timeout_secs, rw_callback, cb_arg) p.err_cb(mut p.user_data, picohttpparser.Request{}, mut &picohttpparser.Response{},
err)
}
C.picoev_add(loop, newfd, int(Event.read), p.timeout_secs, rw_callback, cb_arg)
} }
} }
pub fn new(port int, cb voidptr) &Picoev { fn default_err_cb(data voidptr, req picohttpparser.Request, mut res picohttpparser.Response, error IError) {
eprintln('picoev: $error')
res.end()
}
pub fn new(config Config) &Picoev {
fd := C.socket(net.SocketFamily.inet, net.SocketType.tcp, 0) fd := C.socket(net.SocketFamily.inet, net.SocketType.tcp, 0)
assert fd != -1 assert fd != -1
flag := 1 flag := 1
@ -192,24 +208,31 @@ pub fn new(port int, cb voidptr) &Picoev {
} }
mut addr := C.sockaddr_in{} mut addr := C.sockaddr_in{}
addr.sin_family = C.AF_INET addr.sin_family = C.AF_INET
addr.sin_port = C.htons(port) addr.sin_port = C.htons(config.port)
addr.sin_addr.s_addr = C.htonl(C.INADDR_ANY) addr.sin_addr.s_addr = C.htonl(C.INADDR_ANY)
size := 16 // sizeof(C.sockaddr_in) size := 16 // sizeof(C.sockaddr_in)
bind_res := C.bind(fd, &addr, size) bind_res := C.bind(fd, &addr, size)
assert bind_res == 0 assert bind_res == 0
listen_res := C.listen(fd, C.SOMAXCONN) listen_res := C.listen(fd, C.SOMAXCONN)
assert listen_res == 0 assert listen_res == 0
setup_sock(fd) setup_sock(fd) or {
C.picoev_init(max_fds) config.err_cb(mut config.user_data, picohttpparser.Request{}, mut &picohttpparser.Response{},
loop := C.picoev_create_loop(max_timeout) err)
}
C.picoev_init(picoev.max_fds)
loop := C.picoev_create_loop(picoev.max_timeout)
mut pv := &Picoev{ mut pv := &Picoev{
loop: loop loop: loop
cb: cb cb: config.cb
err_cb: config.err_cb
user_data: config.user_data
timeout_secs: config.timeout_secs
max_headers: config.max_headers
date: C.get_date() date: C.get_date()
buf: unsafe { malloc(max_fds * max_read + 1) } buf: unsafe { malloc(picoev.max_fds * picoev.max_read + 1) }
out: unsafe { malloc(max_fds * max_write + 1) } out: unsafe { malloc(picoev.max_fds * picoev.max_write + 1) }
} }
C.picoev_add(loop, fd, C.PICOEV_READ, 0, accept_callback, pv) C.picoev_add(loop, fd, int(Event.read), 0, accept_callback, pv)
go update_date(mut pv) go update_date(mut pv)
return pv return pv
} }