diff --git a/include/lnm/loop.h b/include/lnm/loop.h index 4f98349..7ca372c 100644 --- a/include/lnm/loop.h +++ b/include/lnm/loop.h @@ -1,7 +1,6 @@ #ifndef LNM_LOOP #define LNM_LOOP -#include #include #include #include @@ -10,17 +9,12 @@ #define LNM_LOOP_BUF_SIZE 2048 -typedef enum lnm_loop_state { +typedef enum { lnm_loop_state_req = 0, lnm_loop_state_res, lnm_loop_state_end, - lnm_loop_state_req_blocking, - lnm_loop_state_res_blocking, } lnm_loop_state; -/** - * State for a currently active connection - */ typedef struct lnm_loop_conn { int fd; lnm_loop_state state; @@ -36,37 +30,6 @@ typedef struct lnm_loop_conn { } w; } lnm_loop_conn; -/** - * Concurrent fixed-size queue used to distribute work among worker threads - */ -typedef struct lnm_loop_queue { - struct { - lnm_loop_conn **arr; - size_t len; - } buf; - size_t head; - size_t tail; - pthread_mutex_t mutex; - pthread_cond_t cond; -} lnm_loop_queue; - -/** - * Initialize a new queue with the specified fixed capacity - */ -lnm_err lnm_loop_queue_init(lnm_loop_queue **out, size_t cap); - -/** - * Queue the given connection. If the queue is currently full, this action - * blocks until there is space available. - */ -void lnm_loop_queue_push(lnm_loop_queue *q, lnm_loop_conn *conn); - -/** - * Pop a connection from the queue. This action blocks until a connection is - * available. - */ -lnm_loop_conn *lnm_loop_queue_pop(lnm_loop_queue *q); - typedef struct lnm_loop { int listen_fd; int epoll_fd; @@ -76,7 +39,6 @@ typedef struct lnm_loop { void (*ctx_free)(void *ctx); void (*data_read)(lnm_loop_conn *conn); void (*data_write)(lnm_loop_conn *conn); - lnm_loop_queue *wq; } lnm_loop; lnm_err lnm_loop_init(lnm_loop **out, void *gctx, @@ -89,15 +51,4 @@ lnm_err lnm_loop_setup(lnm_loop *l, uint16_t port); lnm_err lnm_loop_run(lnm_loop *l, int thread_count); -/** - * Reschedule the given connection, either on the event loop for network IO or - * on a worker thread for blocking work. Connections are terminated as needed. - */ -void lnm_loop_conn_schedule(lnm_loop *l, lnm_loop_conn *conn); - -/** - * Main loop executed on the worker threads. - */ -void lnm_loop_worker_run(void *arg); - #endif diff --git a/src/loop/lnm_loop.c b/src/loop/lnm_loop.c index 5379ecc..6c9a854 100644 --- a/src/loop/lnm_loop.c +++ b/src/loop/lnm_loop.c @@ -62,10 +62,6 @@ lnm_err lnm_loop_accept(lnm_loop *l) { l->open++; - // Make sure to re-arm the listening socket after accepting - event.data.ptr = NULL; - epoll_ctl(l->epoll_fd, EPOLL_CTL_MOD, l->listen_fd, &event); - lnm_ldebug(section, "connection opened with fd %i", conn_fd); return lnm_err_ok; @@ -127,36 +123,6 @@ lnm_err lnm_loop_setup(lnm_loop *l, uint16_t port) { return lnm_err_ok; } -void lnm_loop_conn_schedule(lnm_loop *l, lnm_loop_conn *conn) { - switch (conn->state) { - // IO states get rescheduled in the epoll loop - case lnm_loop_state_req: - case lnm_loop_state_res: { - struct epoll_event event = { - .data.ptr = conn, - .events = (conn->state == lnm_loop_state_req ? EPOLLIN : EPOLLOUT) | - EPOLLET | EPOLLONESHOT}; - - epoll_ctl(l->epoll_fd, EPOLL_CTL_MOD, conn->fd, &event); - } break; - case lnm_loop_state_req_blocking: - case lnm_loop_state_res_blocking: - lnm_loop_queue_push(l->wq, conn); - break; - case lnm_loop_state_end: { - int conn_fd = conn->fd; - - lnm_loop_conn_free(l, conn); - close(conn_fd); - l->open--; - - epoll_ctl(l->epoll_fd, EPOLL_CTL_DEL, conn_fd, NULL); - - lnm_ldebug(section, "connection closed with fd %i", conn_fd); - } break; - } -} - typedef struct lnm_loop_thread_args { lnm_loop *l; int id; @@ -177,6 +143,9 @@ lnm_err lnm_loop_run_thread(lnm_loop_thread_args *args) { lnm_linfo(section, "thread %i started", thread_id); + struct epoll_event listen_event = { + .data.ptr = NULL, .events = EPOLLIN | EPOLLET | EPOLLONESHOT}; + while (1) { int polled = epoll_wait(l->epoll_fd, events, events_cap, -1); lnm_ldebug(section, "polled (thread %i): %i", thread_id, polled); @@ -188,12 +157,30 @@ lnm_err lnm_loop_run_thread(lnm_loop_thread_args *args) { for (int i = 0; i < polled; i++) { if (events[i].data.ptr == NULL) { lnm_loop_accept(l); + + epoll_ctl(l->epoll_fd, EPOLL_CTL_MOD, l->listen_fd, &listen_event); } else { lnm_loop_conn *conn = events[i].data.ptr; - - // At this point, state is always an IO state lnm_loop_conn_io(l, conn); - lnm_loop_conn_schedule(l, conn); + + if (conn->state == lnm_loop_state_end) { + int conn_fd = conn->fd; + + lnm_loop_conn_free(l, conn); + close(conn_fd); + l->open--; + + epoll_ctl(l->epoll_fd, EPOLL_CTL_DEL, conn_fd, NULL); + + lnm_ldebug(section, "connection closed with fd %i", conn_fd); + } else { + struct epoll_event event = { + .data.ptr = conn, + .events = + (conn->state == lnm_loop_state_req ? EPOLLIN : EPOLLOUT) | + EPOLLET | EPOLLONESHOT}; + epoll_ctl(l->epoll_fd, EPOLL_CTL_MOD, conn->fd, &event); + } } } diff --git a/src/loop/lnm_loop_worker.c b/src/loop/lnm_loop_worker.c deleted file mode 100644 index 2f315cb..0000000 --- a/src/loop/lnm_loop_worker.c +++ /dev/null @@ -1,84 +0,0 @@ -#include - -#include "lnm/loop.h" - -lnm_err lnm_loop_queue_init(lnm_loop_queue **out, size_t cap) { - lnm_loop_conn **arr = malloc(cap * sizeof(lnm_loop_conn *)); - - if (arr == NULL) { - return lnm_err_failed_alloc; - } - - lnm_loop_queue *q = calloc(1, sizeof(lnm_loop_queue)); - - if (q == NULL) { - free(arr); - - return lnm_err_failed_alloc; - } - - q->buf.arr = arr; - q->buf.len = cap; - - pthread_mutex_init(&q->mutex, NULL); - pthread_cond_init(&q->cond, NULL); - - *out = q; - - return lnm_err_ok; -} - -void lnm_loop_queue_push(lnm_loop_queue *q, lnm_loop_conn *conn) { - pthread_mutex_lock(&q->mutex); - - while (q->tail == q->head) { - pthread_cond_wait(&q->cond, &q->mutex); - } - - q->buf.arr[q->tail] = conn; - - // Make sure the index wraps around - q->tail = (q->tail + 1) % q->buf.len; - - // Unlock mutex and signal to waiting threads - pthread_mutex_unlock(&q->mutex); - pthread_cond_signal(&q->cond); -} - -lnm_loop_conn *lnm_loop_queue_pop(lnm_loop_queue *q) { - pthread_mutex_lock(&q->mutex); - - while (q->tail == q->head) { - pthread_cond_wait(&q->cond, &q->mutex); - } - - lnm_loop_conn *out = q->buf.arr[q->head]; - q->head = (q->head + 1) % q->buf.len; - - // Unlock mutex and signal to waiting threads - pthread_mutex_unlock(&q->mutex); - pthread_cond_signal(&q->cond); - - return out; -} - -void lnm_loop_worker_run(void *arg) { - lnm_loop *l = arg; - lnm_loop_queue *q = l->wq; - - while (1) { - lnm_loop_conn *conn = lnm_loop_queue_pop(q); - - switch (conn->state) { - case lnm_loop_state_req_blocking: - l->data_read(conn); - break; - case lnm_loop_state_res_blocking: - l->data_write(conn); - // Other states shouldn't even end up here, so we ignore them - default:; - } - - lnm_loop_conn_schedule(l, conn); - } -}