diff --git a/include/lnm/loop.h b/include/lnm/loop.h index 4c33aa5..4f98349 100644 --- a/include/lnm/loop.h +++ b/include/lnm/loop.h @@ -10,12 +10,17 @@ #define LNM_LOOP_BUF_SIZE 2048 -typedef enum { +typedef enum lnm_loop_state { lnm_loop_state_req = 0, lnm_loop_state_res, lnm_loop_state_end, + lnm_loop_state_req_blocking, + lnm_loop_state_res_blocking, } lnm_loop_state; +/** + * State for a currently active connection + */ typedef struct lnm_loop_conn { int fd; lnm_loop_state state; @@ -31,29 +36,8 @@ typedef struct lnm_loop_conn { } w; } lnm_loop_conn; -typedef struct lnm_loop { - int listen_fd; - int epoll_fd; - atomic_int open; - void *gctx; - lnm_err (*ctx_init)(void **out, void *gctx); - void (*ctx_free)(void *ctx); - void (*data_read)(lnm_loop_conn *conn); - void (*data_write)(lnm_loop_conn *conn); -} lnm_loop; - -lnm_err lnm_loop_init(lnm_loop **out, void *gctx, - lnm_err (*ctx_init)(void **out, void *gctx), - void (*ctx_free)(void *ctx), - void (*data_read)(lnm_loop_conn *conn), - void (*data_write)(lnm_loop_conn *conn)); - -lnm_err lnm_loop_setup(lnm_loop *l, uint16_t port); - -lnm_err lnm_loop_run(lnm_loop *l, int thread_count); - /** - * Concurrent fixed-size queue used by the worked threads to distribute tasks + * Concurrent fixed-size queue used to distribute work among worker threads */ typedef struct lnm_loop_queue { struct { @@ -83,4 +67,37 @@ void lnm_loop_queue_push(lnm_loop_queue *q, lnm_loop_conn *conn); */ lnm_loop_conn *lnm_loop_queue_pop(lnm_loop_queue *q); +typedef struct lnm_loop { + int listen_fd; + int epoll_fd; + atomic_int open; + void *gctx; + lnm_err (*ctx_init)(void **out, void *gctx); + void (*ctx_free)(void *ctx); + void (*data_read)(lnm_loop_conn *conn); + void (*data_write)(lnm_loop_conn *conn); + lnm_loop_queue *wq; +} lnm_loop; + +lnm_err lnm_loop_init(lnm_loop **out, void *gctx, + lnm_err (*ctx_init)(void **out, void *gctx), + void (*ctx_free)(void *ctx), + void (*data_read)(lnm_loop_conn *conn), + void (*data_write)(lnm_loop_conn *conn)); + +lnm_err lnm_loop_setup(lnm_loop *l, uint16_t port); + +lnm_err lnm_loop_run(lnm_loop *l, int thread_count); + +/** + * Reschedule the given connection, either on the event loop for network IO or + * on a worker thread for blocking work. Connections are terminated as needed. + */ +void lnm_loop_conn_schedule(lnm_loop *l, lnm_loop_conn *conn); + +/** + * Main loop executed on the worker threads. + */ +void lnm_loop_worker_run(void *arg); + #endif diff --git a/src/loop/lnm_loop.c b/src/loop/lnm_loop.c index 6c9a854..5379ecc 100644 --- a/src/loop/lnm_loop.c +++ b/src/loop/lnm_loop.c @@ -62,6 +62,10 @@ lnm_err lnm_loop_accept(lnm_loop *l) { l->open++; + // Make sure to re-arm the listening socket after accepting + event.data.ptr = NULL; + epoll_ctl(l->epoll_fd, EPOLL_CTL_MOD, l->listen_fd, &event); + lnm_ldebug(section, "connection opened with fd %i", conn_fd); return lnm_err_ok; @@ -123,6 +127,36 @@ lnm_err lnm_loop_setup(lnm_loop *l, uint16_t port) { return lnm_err_ok; } +void lnm_loop_conn_schedule(lnm_loop *l, lnm_loop_conn *conn) { + switch (conn->state) { + // IO states get rescheduled in the epoll loop + case lnm_loop_state_req: + case lnm_loop_state_res: { + struct epoll_event event = { + .data.ptr = conn, + .events = (conn->state == lnm_loop_state_req ? EPOLLIN : EPOLLOUT) | + EPOLLET | EPOLLONESHOT}; + + epoll_ctl(l->epoll_fd, EPOLL_CTL_MOD, conn->fd, &event); + } break; + case lnm_loop_state_req_blocking: + case lnm_loop_state_res_blocking: + lnm_loop_queue_push(l->wq, conn); + break; + case lnm_loop_state_end: { + int conn_fd = conn->fd; + + lnm_loop_conn_free(l, conn); + close(conn_fd); + l->open--; + + epoll_ctl(l->epoll_fd, EPOLL_CTL_DEL, conn_fd, NULL); + + lnm_ldebug(section, "connection closed with fd %i", conn_fd); + } break; + } +} + typedef struct lnm_loop_thread_args { lnm_loop *l; int id; @@ -143,9 +177,6 @@ lnm_err lnm_loop_run_thread(lnm_loop_thread_args *args) { lnm_linfo(section, "thread %i started", thread_id); - struct epoll_event listen_event = { - .data.ptr = NULL, .events = EPOLLIN | EPOLLET | EPOLLONESHOT}; - while (1) { int polled = epoll_wait(l->epoll_fd, events, events_cap, -1); lnm_ldebug(section, "polled (thread %i): %i", thread_id, polled); @@ -157,30 +188,12 @@ lnm_err lnm_loop_run_thread(lnm_loop_thread_args *args) { for (int i = 0; i < polled; i++) { if (events[i].data.ptr == NULL) { lnm_loop_accept(l); - - epoll_ctl(l->epoll_fd, EPOLL_CTL_MOD, l->listen_fd, &listen_event); } else { lnm_loop_conn *conn = events[i].data.ptr; + + // At this point, state is always an IO state lnm_loop_conn_io(l, conn); - - if (conn->state == lnm_loop_state_end) { - int conn_fd = conn->fd; - - lnm_loop_conn_free(l, conn); - close(conn_fd); - l->open--; - - epoll_ctl(l->epoll_fd, EPOLL_CTL_DEL, conn_fd, NULL); - - lnm_ldebug(section, "connection closed with fd %i", conn_fd); - } else { - struct epoll_event event = { - .data.ptr = conn, - .events = - (conn->state == lnm_loop_state_req ? EPOLLIN : EPOLLOUT) | - EPOLLET | EPOLLONESHOT}; - epoll_ctl(l->epoll_fd, EPOLL_CTL_MOD, conn->fd, &event); - } + lnm_loop_conn_schedule(l, conn); } } diff --git a/src/loop/lnm_loop_worker.c b/src/loop/lnm_loop_worker.c index 067f1e8..2f315cb 100644 --- a/src/loop/lnm_loop_worker.c +++ b/src/loop/lnm_loop_worker.c @@ -1,3 +1,5 @@ +#include + #include "lnm/loop.h" lnm_err lnm_loop_queue_init(lnm_loop_queue **out, size_t cap) { @@ -59,3 +61,24 @@ lnm_loop_conn *lnm_loop_queue_pop(lnm_loop_queue *q) { return out; } + +void lnm_loop_worker_run(void *arg) { + lnm_loop *l = arg; + lnm_loop_queue *q = l->wq; + + while (1) { + lnm_loop_conn *conn = lnm_loop_queue_pop(q); + + switch (conn->state) { + case lnm_loop_state_req_blocking: + l->data_read(conn); + break; + case lnm_loop_state_res_blocking: + l->data_write(conn); + // Other states shouldn't even end up here, so we ignore them + default:; + } + + lnm_loop_conn_schedule(l, conn); + } +}