diff --git a/include/lnm/loop.h b/include/lnm/loop.h index 7ca372c..4f98349 100644 --- a/include/lnm/loop.h +++ b/include/lnm/loop.h @@ -1,6 +1,7 @@ #ifndef LNM_LOOP #define LNM_LOOP +#include #include #include #include @@ -9,12 +10,17 @@ #define LNM_LOOP_BUF_SIZE 2048 -typedef enum { +typedef enum lnm_loop_state { lnm_loop_state_req = 0, lnm_loop_state_res, lnm_loop_state_end, + lnm_loop_state_req_blocking, + lnm_loop_state_res_blocking, } lnm_loop_state; +/** + * State for a currently active connection + */ typedef struct lnm_loop_conn { int fd; lnm_loop_state state; @@ -30,6 +36,37 @@ typedef struct lnm_loop_conn { } w; } lnm_loop_conn; +/** + * Concurrent fixed-size queue used to distribute work among worker threads + */ +typedef struct lnm_loop_queue { + struct { + lnm_loop_conn **arr; + size_t len; + } buf; + size_t head; + size_t tail; + pthread_mutex_t mutex; + pthread_cond_t cond; +} lnm_loop_queue; + +/** + * Initialize a new queue with the specified fixed capacity + */ +lnm_err lnm_loop_queue_init(lnm_loop_queue **out, size_t cap); + +/** + * Queue the given connection. If the queue is currently full, this action + * blocks until there is space available. + */ +void lnm_loop_queue_push(lnm_loop_queue *q, lnm_loop_conn *conn); + +/** + * Pop a connection from the queue. This action blocks until a connection is + * available. + */ +lnm_loop_conn *lnm_loop_queue_pop(lnm_loop_queue *q); + typedef struct lnm_loop { int listen_fd; int epoll_fd; @@ -39,6 +76,7 @@ typedef struct lnm_loop { void (*ctx_free)(void *ctx); void (*data_read)(lnm_loop_conn *conn); void (*data_write)(lnm_loop_conn *conn); + lnm_loop_queue *wq; } lnm_loop; lnm_err lnm_loop_init(lnm_loop **out, void *gctx, @@ -51,4 +89,15 @@ lnm_err lnm_loop_setup(lnm_loop *l, uint16_t port); lnm_err lnm_loop_run(lnm_loop *l, int thread_count); +/** + * Reschedule the given connection, either on the event loop for network IO or + * on a worker thread for blocking work. Connections are terminated as needed. + */ +void lnm_loop_conn_schedule(lnm_loop *l, lnm_loop_conn *conn); + +/** + * Main loop executed on the worker threads. + */ +void lnm_loop_worker_run(void *arg); + #endif diff --git a/src/loop/lnm_loop.c b/src/loop/lnm_loop.c index 6c9a854..5379ecc 100644 --- a/src/loop/lnm_loop.c +++ b/src/loop/lnm_loop.c @@ -62,6 +62,10 @@ lnm_err lnm_loop_accept(lnm_loop *l) { l->open++; + // Make sure to re-arm the listening socket after accepting + event.data.ptr = NULL; + epoll_ctl(l->epoll_fd, EPOLL_CTL_MOD, l->listen_fd, &event); + lnm_ldebug(section, "connection opened with fd %i", conn_fd); return lnm_err_ok; @@ -123,6 +127,36 @@ lnm_err lnm_loop_setup(lnm_loop *l, uint16_t port) { return lnm_err_ok; } +void lnm_loop_conn_schedule(lnm_loop *l, lnm_loop_conn *conn) { + switch (conn->state) { + // IO states get rescheduled in the epoll loop + case lnm_loop_state_req: + case lnm_loop_state_res: { + struct epoll_event event = { + .data.ptr = conn, + .events = (conn->state == lnm_loop_state_req ? EPOLLIN : EPOLLOUT) | + EPOLLET | EPOLLONESHOT}; + + epoll_ctl(l->epoll_fd, EPOLL_CTL_MOD, conn->fd, &event); + } break; + case lnm_loop_state_req_blocking: + case lnm_loop_state_res_blocking: + lnm_loop_queue_push(l->wq, conn); + break; + case lnm_loop_state_end: { + int conn_fd = conn->fd; + + lnm_loop_conn_free(l, conn); + close(conn_fd); + l->open--; + + epoll_ctl(l->epoll_fd, EPOLL_CTL_DEL, conn_fd, NULL); + + lnm_ldebug(section, "connection closed with fd %i", conn_fd); + } break; + } +} + typedef struct lnm_loop_thread_args { lnm_loop *l; int id; @@ -143,9 +177,6 @@ lnm_err lnm_loop_run_thread(lnm_loop_thread_args *args) { lnm_linfo(section, "thread %i started", thread_id); - struct epoll_event listen_event = { - .data.ptr = NULL, .events = EPOLLIN | EPOLLET | EPOLLONESHOT}; - while (1) { int polled = epoll_wait(l->epoll_fd, events, events_cap, -1); lnm_ldebug(section, "polled (thread %i): %i", thread_id, polled); @@ -157,30 +188,12 @@ lnm_err lnm_loop_run_thread(lnm_loop_thread_args *args) { for (int i = 0; i < polled; i++) { if (events[i].data.ptr == NULL) { lnm_loop_accept(l); - - epoll_ctl(l->epoll_fd, EPOLL_CTL_MOD, l->listen_fd, &listen_event); } else { lnm_loop_conn *conn = events[i].data.ptr; + + // At this point, state is always an IO state lnm_loop_conn_io(l, conn); - - if (conn->state == lnm_loop_state_end) { - int conn_fd = conn->fd; - - lnm_loop_conn_free(l, conn); - close(conn_fd); - l->open--; - - epoll_ctl(l->epoll_fd, EPOLL_CTL_DEL, conn_fd, NULL); - - lnm_ldebug(section, "connection closed with fd %i", conn_fd); - } else { - struct epoll_event event = { - .data.ptr = conn, - .events = - (conn->state == lnm_loop_state_req ? EPOLLIN : EPOLLOUT) | - EPOLLET | EPOLLONESHOT}; - epoll_ctl(l->epoll_fd, EPOLL_CTL_MOD, conn->fd, &event); - } + lnm_loop_conn_schedule(l, conn); } } diff --git a/src/loop/lnm_loop_worker.c b/src/loop/lnm_loop_worker.c new file mode 100644 index 0000000..2f315cb --- /dev/null +++ b/src/loop/lnm_loop_worker.c @@ -0,0 +1,84 @@ +#include + +#include "lnm/loop.h" + +lnm_err lnm_loop_queue_init(lnm_loop_queue **out, size_t cap) { + lnm_loop_conn **arr = malloc(cap * sizeof(lnm_loop_conn *)); + + if (arr == NULL) { + return lnm_err_failed_alloc; + } + + lnm_loop_queue *q = calloc(1, sizeof(lnm_loop_queue)); + + if (q == NULL) { + free(arr); + + return lnm_err_failed_alloc; + } + + q->buf.arr = arr; + q->buf.len = cap; + + pthread_mutex_init(&q->mutex, NULL); + pthread_cond_init(&q->cond, NULL); + + *out = q; + + return lnm_err_ok; +} + +void lnm_loop_queue_push(lnm_loop_queue *q, lnm_loop_conn *conn) { + pthread_mutex_lock(&q->mutex); + + while (q->tail == q->head) { + pthread_cond_wait(&q->cond, &q->mutex); + } + + q->buf.arr[q->tail] = conn; + + // Make sure the index wraps around + q->tail = (q->tail + 1) % q->buf.len; + + // Unlock mutex and signal to waiting threads + pthread_mutex_unlock(&q->mutex); + pthread_cond_signal(&q->cond); +} + +lnm_loop_conn *lnm_loop_queue_pop(lnm_loop_queue *q) { + pthread_mutex_lock(&q->mutex); + + while (q->tail == q->head) { + pthread_cond_wait(&q->cond, &q->mutex); + } + + lnm_loop_conn *out = q->buf.arr[q->head]; + q->head = (q->head + 1) % q->buf.len; + + // Unlock mutex and signal to waiting threads + pthread_mutex_unlock(&q->mutex); + pthread_cond_signal(&q->cond); + + return out; +} + +void lnm_loop_worker_run(void *arg) { + lnm_loop *l = arg; + lnm_loop_queue *q = l->wq; + + while (1) { + lnm_loop_conn *conn = lnm_loop_queue_pop(q); + + switch (conn->state) { + case lnm_loop_state_req_blocking: + l->data_read(conn); + break; + case lnm_loop_state_res_blocking: + l->data_write(conn); + // Other states shouldn't even end up here, so we ignore them + default:; + } + + lnm_loop_conn_schedule(l, conn); + } +}