diff --git a/example/blocking.c b/example/blocking.c index a145ed5..9b395b4 100644 --- a/example/blocking.c +++ b/example/blocking.c @@ -27,12 +27,12 @@ int main() { ctx_reset, ctx_free); - lnm_http_step_append(&step, slow_step, true); + lnm_http_step_init(&step, slow_step); lnm_http_route_init_literal(&route, lnm_http_method_get, "/", step); lnm_http_loop_route_add(hl, route); lnm_log_init_global(); lnm_log_register_stdout(lnm_log_level_debug); - printf("res = %i\n", lnm_http_loop_run(hl, 8080, 1, 2)); + lnm_http_loop_run(hl, 8080, 1); } diff --git a/include/lnm/http/loop.h b/include/lnm/http/loop.h index fefdf6a..8e36caa 100644 --- a/include/lnm/http/loop.h +++ b/include/lnm/http/loop.h @@ -32,16 +32,23 @@ lnm_err lnm_http_loop_init(lnm_http_loop **out, void *c_gctx, lnm_http_ctx_reset_fn ctx_reset, lnm_http_ctx_free_fn ctx_free); +/** + * Initialize a new step. + * + * @param out where to store pointer to new `lnm_http_step` + * @param fn step function + */ +lnm_err lnm_http_step_init(lnm_http_step **out, lnm_http_step_fn fn); + /** * Append the given step fn to the step. * - * @param out both the previous step to append the new step to, and the output - * variable to which the new step is appended + * @param out where to store pointer to new `lnm_http_step` + * @param step step to append new step to * @param fn step function - * @param blocking whether the step is blocking or not */ -lnm_err lnm_http_step_append(lnm_http_step **out, lnm_http_step_fn fn, - bool blocking); +lnm_err lnm_http_step_append(lnm_http_step **out, lnm_http_step *step, + lnm_http_step_fn fn); /** * Initialize a new route of type literal. @@ -74,8 +81,7 @@ lnm_err lnm_http_route_init_regex(lnm_http_route **out, lnm_http_method method, */ lnm_err lnm_http_loop_route_add(lnm_http_loop *hl, lnm_http_route *route); -lnm_err lnm_http_loop_run(lnm_http_loop *hl, uint16_t port, - size_t epoll_threads, size_t worker_threads); +lnm_err lnm_http_loop_run(lnm_http_loop *hl, uint16_t port, int thread_count); void lnm_http_loop_set_api_key(lnm_http_loop *hl, const char *api_key); diff --git a/include/lnm/loop.h b/include/lnm/loop.h index b922c1f..7ca372c 100644 --- a/include/lnm/loop.h +++ b/include/lnm/loop.h @@ -1,7 +1,6 @@ #ifndef LNM_LOOP #define LNM_LOOP -#include #include #include #include @@ -9,19 +8,13 @@ #include "lnm/common.h" #define LNM_LOOP_BUF_SIZE 2048 -#define LNM_QUEUE_MULTIPLIER 8 -typedef enum lnm_loop_state { - lnm_loop_state_req_io = 0, - lnm_loop_state_res_io, +typedef enum { + lnm_loop_state_req = 0, + lnm_loop_state_res, lnm_loop_state_end, - lnm_loop_state_req_work, - lnm_loop_state_res_work, } lnm_loop_state; -/** - * State for a currently active connection - */ typedef struct lnm_loop_conn { int fd; lnm_loop_state state; @@ -37,38 +30,6 @@ typedef struct lnm_loop_conn { } w; } lnm_loop_conn; -/** - * Concurrent fixed-size queue used to distribute work among worker threads - */ -typedef struct lnm_loop_queue { - struct { - lnm_loop_conn **arr; - size_t len; - } buf; - size_t head; - size_t tail; - bool empty; - pthread_mutex_t mutex; - pthread_cond_t cond; -} lnm_loop_queue; - -/** - * Initialize a new queue with the specified fixed capacity - */ -lnm_err lnm_loop_queue_init(lnm_loop_queue **out, size_t cap); - -/** - * Queue the given connection. If the queue is currently full, this action - * blocks until there is space available. - */ -void lnm_loop_queue_push(lnm_loop_queue *q, lnm_loop_conn *conn); - -/** - * Pop a connection from the queue. This action blocks until a connection is - * available. - */ -lnm_loop_conn *lnm_loop_queue_pop(lnm_loop_queue *q); - typedef struct lnm_loop { int listen_fd; int epoll_fd; @@ -78,13 +39,6 @@ typedef struct lnm_loop { void (*ctx_free)(void *ctx); void (*data_read)(lnm_loop_conn *conn); void (*data_write)(lnm_loop_conn *conn); - lnm_loop_queue *wq; - struct { - // Mutex shared between all threads; used for counting thread IDs - pthread_mutex_t mutex; - size_t worker_count; - size_t epoll_count; - } threads; } lnm_loop; lnm_err lnm_loop_init(lnm_loop **out, void *gctx, @@ -95,46 +49,6 @@ lnm_err lnm_loop_init(lnm_loop **out, void *gctx, lnm_err lnm_loop_setup(lnm_loop *l, uint16_t port); -/** - * Run a single epoll thread of the event loop. - */ -lnm_err lnm_loop_run(lnm_loop *l); - -/** - * Run a multithreaded event loop with the configured number of threads. - */ -lnm_err lnm_loop_run_multi(lnm_loop *l, size_t epoll_threads, - size_t worker_threads); - -/** - * Advance the processing of the given connection. - * - * Behavior of this function depends on both the connection state and whether - * worker threads are enabled. - * - * For IO states, this function will perform network I/O along with executing - * the loop's respective processing functions. - * - * For work states, the respective processing functions are executed without - * performing any network I/O. If no worker queue is present, this function - * performs all blocking work until an I/O or the end state is reached. If there - * is a worker queue present, only one block of work is done before exiting, - * allowing further blocks of work to be scheduled on other worker threads. - * - * If no worker queue is present, this function will only exit once an I/O or - * end state is reached. - */ -void lnm_loop_conn_advance(lnm_loop *l, lnm_loop_conn *conn); - -/** - * Reschedule the given connection, either on the event loop for network IO or - * on a worker thread for blocking work. Connections are terminated as needed. - */ -void lnm_loop_conn_schedule(lnm_loop *l, lnm_loop_conn *conn); - -/** - * Main loop executed on the worker threads. - */ -void lnm_loop_worker_run(void *arg); +lnm_err lnm_loop_run(lnm_loop *l, int thread_count); #endif diff --git a/src/_include/lnm/http/loop_internal.h b/src/_include/lnm/http/loop_internal.h index a343de6..4d9e865 100644 --- a/src/_include/lnm/http/loop_internal.h +++ b/src/_include/lnm/http/loop_internal.h @@ -8,7 +8,6 @@ typedef struct lnm_http_step { lnm_http_step_fn fn; struct lnm_http_step *next; - bool blocking; } lnm_http_step; typedef enum lnm_http_route_type { diff --git a/src/_include/lnm/loop_internal.h b/src/_include/lnm/loop_internal.h index c71f303..a5e70a8 100644 --- a/src/_include/lnm/loop_internal.h +++ b/src/_include/lnm/loop_internal.h @@ -9,6 +9,6 @@ void lnm_loop_conn_free(lnm_loop *l, lnm_loop_conn *conn); lnm_err lnm_loop_accept(lnm_loop *l); -void lnm_loop_conn_advance(lnm_loop *l, lnm_loop_conn *conn); +void lnm_loop_conn_io(lnm_loop *l, lnm_loop_conn *conn); #endif diff --git a/src/http/lnm_http_loop.c b/src/http/lnm_http_loop.c index bd83526..784126e 100644 --- a/src/http/lnm_http_loop.c +++ b/src/http/lnm_http_loop.c @@ -28,8 +28,7 @@ lnm_err lnm_http_loop_init(lnm_http_loop **out, void *c_gctx, return lnm_err_ok; } -lnm_err lnm_http_step_append(lnm_http_step **out, lnm_http_step_fn fn, - bool blocking) { +lnm_err lnm_http_step_init(lnm_http_step **out, lnm_http_step_fn fn) { lnm_http_step *step = calloc(1, sizeof(lnm_http_step)); if (step == NULL) { @@ -37,17 +36,22 @@ lnm_err lnm_http_step_append(lnm_http_step **out, lnm_http_step_fn fn, } step->fn = fn; - step->blocking = blocking; - - if ((*out) != NULL) { - (*out)->next = step; - } - *out = step; return lnm_err_ok; } +lnm_err lnm_http_step_append(lnm_http_step **out, lnm_http_step *step, + lnm_http_step_fn fn) { + LNM_RES(lnm_http_step_init(out, fn)); + + if (step != NULL) { + step->next = *out; + } + + return lnm_err_ok; +} + lnm_err lnm_http_route_init(lnm_http_route **out) { lnm_http_route *route = calloc(1, sizeof(lnm_http_route)); @@ -118,10 +122,9 @@ lnm_err lnm_http_loop_route_add(lnm_http_loop *hl, lnm_http_route *route) { return lnm_err_ok; } -lnm_err lnm_http_loop_run(lnm_http_loop *hl, uint16_t port, - size_t epoll_threads, size_t worker_threads) { +lnm_err lnm_http_loop_run(lnm_http_loop *hl, uint16_t port, int thread_count) { LNM_RES(lnm_loop_setup(hl, port)); - return lnm_loop_run_multi(hl, epoll_threads, worker_threads); + return lnm_loop_run(hl, thread_count); } void lnm_http_loop_set_api_key(lnm_http_loop *hl, const char *api_key) { diff --git a/src/http/lnm_http_loop_process.c b/src/http/lnm_http_loop_process.c index 1e465cf..e16e18c 100644 --- a/src/http/lnm_http_loop_process.c +++ b/src/http/lnm_http_loop_process.c @@ -133,19 +133,11 @@ void lnm_http_loop_process_steps(lnm_http_conn *conn) { while ((ctx->cur_step != NULL) && (step != ctx->cur_step)) { step = ctx->cur_step; - if (step->blocking && (conn->state != lnm_loop_state_req_work)) { - conn->state = lnm_loop_state_req_work; - - break; - } - switch (step->fn(conn)) { case lnm_http_step_err_done: ctx->cur_step = ctx->cur_step->next; break; case lnm_http_step_err_io_needed: - // Ensure steps that require more I/O are executed on the event loop - conn->state = lnm_loop_state_req_io; break; case lnm_http_step_err_close: conn->state = lnm_loop_state_end; @@ -157,7 +149,6 @@ void lnm_http_loop_process_steps(lnm_http_conn *conn) { } if (ctx->cur_step == NULL) { - conn->state = lnm_loop_state_res_io; ctx->state = lnm_http_loop_state_add_headers; } } @@ -334,23 +325,23 @@ void (*process_fns[])(lnm_http_conn *conn) = { lnm_loop_state state_map[] = { // parse_req - lnm_loop_state_req_io, + lnm_loop_state_req, // route - lnm_loop_state_req_io, + lnm_loop_state_req, // parse_headers - lnm_loop_state_req_io, + lnm_loop_state_req, // steps - lnm_loop_state_req_io, + lnm_loop_state_req, // add_headers - lnm_loop_state_req_io, + lnm_loop_state_req, // write_status_line - lnm_loop_state_res_io, + lnm_loop_state_res, // write_headers - lnm_loop_state_res_io, + lnm_loop_state_res, // write_body - lnm_loop_state_res_io, + lnm_loop_state_res, // finish - lnm_loop_state_res_io, + lnm_loop_state_res, }; void lnm_http_loop_process(lnm_http_conn *conn) { @@ -378,7 +369,7 @@ void lnm_http_loop_process(lnm_http_conn *conn) { // We move the request to a dedicated buffer if the read buffer needs to be // reused - if ((conn->state == lnm_loop_state_req_io) && (conn->state == loop_state) && + if ((conn->state == lnm_loop_state_req) && (conn->state == loop_state) && (!ctx->req.buf.owned) && (ctx->req.buf.len > 0)) { char *buf = malloc(ctx->req.buf.len); diff --git a/src/loop/lnm_loop.c b/src/loop/lnm_loop.c index 5154b1e..6c9a854 100644 --- a/src/loop/lnm_loop.c +++ b/src/loop/lnm_loop.c @@ -30,8 +30,6 @@ lnm_err lnm_loop_init(lnm_loop **out, void *gctx, l->data_read = data_read; l->data_write = data_write; - pthread_mutex_init(&l->threads.mutex, NULL); - *out = l; return lnm_err_ok; @@ -55,7 +53,7 @@ lnm_err lnm_loop_accept(lnm_loop *l) { LNM_RES2(lnm_loop_conn_init(&conn, l), close(conn_fd)); conn->fd = conn_fd; - conn->state = lnm_loop_state_req_io; + conn->state = lnm_loop_state_req; struct epoll_event event = {.data.ptr = conn, .events = EPOLLIN | EPOLLET | EPOLLONESHOT}; @@ -64,10 +62,6 @@ lnm_err lnm_loop_accept(lnm_loop *l) { l->open++; - // Make sure to re-arm the listening socket after accepting - event.data.ptr = NULL; - epoll_ctl(l->epoll_fd, EPOLL_CTL_MOD, l->listen_fd, &event); - lnm_ldebug(section, "connection opened with fd %i", conn_fd); return lnm_err_ok; @@ -129,51 +123,19 @@ lnm_err lnm_loop_setup(lnm_loop *l, uint16_t port) { return lnm_err_ok; } -void lnm_loop_conn_schedule(lnm_loop *l, lnm_loop_conn *conn) { - switch (conn->state) { - // IO states get rescheduled in the epoll loop - case lnm_loop_state_req_io: - case lnm_loop_state_res_io: { - struct epoll_event event = { - .data.ptr = conn, - .events = (conn->state == lnm_loop_state_req_io ? EPOLLIN : EPOLLOUT) | - EPOLLET | EPOLLONESHOT}; +typedef struct lnm_loop_thread_args { + lnm_loop *l; + int id; + int thread_count; +} lnm_loop_thread_args; - epoll_ctl(l->epoll_fd, EPOLL_CTL_MOD, conn->fd, &event); - } break; - case lnm_loop_state_req_work: - case lnm_loop_state_res_work: - lnm_loop_queue_push(l->wq, conn); - break; - case lnm_loop_state_end: { - int conn_fd = conn->fd; - - lnm_loop_conn_free(l, conn); - close(conn_fd); - l->open--; - - epoll_ctl(l->epoll_fd, EPOLL_CTL_DEL, conn_fd, NULL); - - lnm_ldebug(section, "connection closed with fd %i", conn_fd); - } break; - } -} - -lnm_err lnm_loop_run(lnm_loop *l) { - if (l->epoll_fd == 0) { - return lnm_err_not_setup; - } - - // Get thread ID by incrementing counter - pthread_mutex_lock(&l->threads.mutex); - - int thread_id = l->threads.epoll_count; - l->threads.epoll_count++; - - pthread_mutex_unlock(&l->threads.mutex); +lnm_err lnm_loop_run_thread(lnm_loop_thread_args *args) { + lnm_loop *l = args->l; + int thread_id = args->id; + int thread_count = args->thread_count; struct epoll_event *events = calloc(1, sizeof(struct epoll_event)); - size_t events_cap = 1; + int events_cap = 1; if (events == NULL) { return lnm_err_failed_alloc; @@ -181,6 +143,9 @@ lnm_err lnm_loop_run(lnm_loop *l) { lnm_linfo(section, "thread %i started", thread_id); + struct epoll_event listen_event = { + .data.ptr = NULL, .events = EPOLLIN | EPOLLET | EPOLLONESHOT}; + while (1) { int polled = epoll_wait(l->epoll_fd, events, events_cap, -1); lnm_ldebug(section, "polled (thread %i): %i", thread_id, polled); @@ -192,19 +157,36 @@ lnm_err lnm_loop_run(lnm_loop *l) { for (int i = 0; i < polled; i++) { if (events[i].data.ptr == NULL) { lnm_loop_accept(l); + + epoll_ctl(l->epoll_fd, EPOLL_CTL_MOD, l->listen_fd, &listen_event); } else { lnm_loop_conn *conn = events[i].data.ptr; + lnm_loop_conn_io(l, conn); - // At this point, state is always an IO state - lnm_loop_conn_advance(l, conn); - lnm_loop_conn_schedule(l, conn); + if (conn->state == lnm_loop_state_end) { + int conn_fd = conn->fd; + + lnm_loop_conn_free(l, conn); + close(conn_fd); + l->open--; + + epoll_ctl(l->epoll_fd, EPOLL_CTL_DEL, conn_fd, NULL); + + lnm_ldebug(section, "connection closed with fd %i", conn_fd); + } else { + struct epoll_event event = { + .data.ptr = conn, + .events = + (conn->state == lnm_loop_state_req ? EPOLLIN : EPOLLOUT) | + EPOLLET | EPOLLONESHOT}; + epoll_ctl(l->epoll_fd, EPOLL_CTL_MOD, conn->fd, &event); + } } } - size_t open = l->open; - size_t cap_per_thread = open + 1 > l->threads.epoll_count - ? (open + 1) / l->threads.epoll_count - : 1; + int open = l->open; + int cap_per_thread = + open + 1 > thread_count ? (open + 1) / thread_count : 1; if (cap_per_thread > events_cap) { struct epoll_event *new_events = @@ -223,21 +205,28 @@ lnm_err lnm_loop_run(lnm_loop *l) { return lnm_err_ok; } -lnm_err lnm_loop_run_multi(lnm_loop *l, size_t epoll_threads, - size_t worker_threads) { - if (worker_threads > 0) { - LNM_RES(lnm_loop_queue_init(&l->wq, LNM_QUEUE_MULTIPLIER * worker_threads)); +lnm_err lnm_loop_run(lnm_loop *l, int thread_count) { + if (l->epoll_fd == 0) { + return lnm_err_not_setup; } - pthread_t t; + lnm_loop_thread_args args[thread_count]; - for (size_t i = 1; i < epoll_threads; i++) { - pthread_create(&t, NULL, (void *(*)(void *))lnm_loop_run, l); + for (int i = 1; i < thread_count; i++) { + args[i].l = l; + args[i].id = i; + args[i].thread_count = thread_count; + + pthread_t thread; + pthread_create(&thread, NULL, (void *(*)(void *))lnm_loop_run_thread, + &args[i]); } - for (size_t i = 0; i < worker_threads; i++) { - pthread_create(&t, NULL, (void *(*)(void *))lnm_loop_worker_run, l); - } + args[0].l = l; + args[0].id = 0; + args[0].thread_count = thread_count; - return lnm_loop_run(l); + lnm_loop_run_thread(&args[0]); + + return lnm_err_ok; } diff --git a/src/loop/lnm_loop_io.c b/src/loop/lnm_loop_io.c index 58f5577..363d6af 100644 --- a/src/loop/lnm_loop_io.c +++ b/src/loop/lnm_loop_io.c @@ -1,6 +1,5 @@ #include #include -#include #include #include "lnm/loop.h" @@ -34,7 +33,7 @@ void lnm_loop_conn_io_req(lnm_loop *l, lnm_loop_conn *conn) { conn->r.size += res; l->data_read(conn); - } while (conn->state == lnm_loop_state_req_io); + } while (conn->state == lnm_loop_state_req); } void lnm_loop_conn_io_res(lnm_loop *l, lnm_loop_conn *conn) { @@ -44,9 +43,7 @@ void lnm_loop_conn_io_res(lnm_loop *l, lnm_loop_conn *conn) { ssize_t res; do { - // Send with MSG_NOSIGNAL prevents closed pipes from exiting the program - // with SIGPIPE - res = send(conn->fd, conn->w.buf, conn->w.size, MSG_NOSIGNAL); + res = write(conn->fd, conn->w.buf, conn->w.size); } while (res < 0 && errno == EINTR); // Write can't be performed without blocking; we come back later @@ -64,33 +61,17 @@ void lnm_loop_conn_io_res(lnm_loop *l, lnm_loop_conn *conn) { // writer function more space to work with memmove(conn->w.buf, &conn->w.buf[res], conn->w.size - res); conn->w.size -= res; - } while (conn->state == lnm_loop_state_res_io); + } while (conn->state == lnm_loop_state_res); } -void lnm_loop_conn_advance(lnm_loop *l, lnm_loop_conn *conn) { - do { - - switch (conn->state) { - case lnm_loop_state_req_io: - lnm_loop_conn_io_req(l, conn); - break; - case lnm_loop_state_res_io: - lnm_loop_conn_io_res(l, conn); - break; - case lnm_loop_state_req_work: - do { - l->data_read(conn); - } while (conn->state == lnm_loop_state_req_work); - break; - case lnm_loop_state_res_work: - do { - l->data_write(conn); - } while (conn->state == lnm_loop_state_res_work); - break; - default:; - } +void lnm_loop_conn_io(lnm_loop *l, lnm_loop_conn *conn) { + switch (conn->state) { + case lnm_loop_state_req: + lnm_loop_conn_io_req(l, conn); + break; + case lnm_loop_state_res: + lnm_loop_conn_io_res(l, conn); + break; + default:; } - // Execute all blocking work if we're running in single-threaded mode - while (l->wq == NULL && (conn->state == lnm_loop_state_req_work || - conn->state == lnm_loop_state_res_work)); } diff --git a/src/loop/lnm_loop_worker.c b/src/loop/lnm_loop_worker.c deleted file mode 100644 index fba4848..0000000 --- a/src/loop/lnm_loop_worker.c +++ /dev/null @@ -1,92 +0,0 @@ -#include - -#include "lnm/log.h" -#include "lnm/loop.h" - -lnm_err lnm_loop_queue_init(lnm_loop_queue **out, size_t cap) { - lnm_loop_conn **arr = calloc(cap, sizeof(lnm_loop_conn *)); - - if (arr == NULL) { - return lnm_err_failed_alloc; - } - - lnm_loop_queue *q = calloc(1, sizeof(lnm_loop_queue)); - - if (q == NULL) { - free(arr); - - return lnm_err_failed_alloc; - } - - q->buf.arr = arr; - q->buf.len = cap; - - q->tail = 0; - q->head = 0; - q->empty = true; - - pthread_mutex_init(&q->mutex, NULL); - pthread_cond_init(&q->cond, NULL); - - *out = q; - - return lnm_err_ok; -} - -void lnm_loop_queue_push(lnm_loop_queue *q, lnm_loop_conn *conn) { - pthread_mutex_lock(&q->mutex); - - while (q->head == q->tail && !q->empty) { - pthread_cond_wait(&q->cond, &q->mutex); - } - - q->buf.arr[q->head] = conn; - - // Make sure the index wraps around - q->head = (q->head + 1) % q->buf.len; - q->empty = false; - - // Unlock mutex and signal to waiting threads - pthread_mutex_unlock(&q->mutex); - pthread_cond_signal(&q->cond); -} - -lnm_loop_conn *lnm_loop_queue_pop(lnm_loop_queue *q) { - pthread_mutex_lock(&q->mutex); - - while (q->empty) { - pthread_cond_wait(&q->cond, &q->mutex); - } - - lnm_loop_conn *out = q->buf.arr[q->tail]; - - q->tail = (q->tail + 1) % q->buf.len; - q->empty = q->tail == q->head; - - // Unlock mutex and signal to waiting threads - pthread_mutex_unlock(&q->mutex); - pthread_cond_signal(&q->cond); - - return out; -} - -void lnm_loop_worker_run(void *arg) { - lnm_loop *l = arg; - lnm_loop_queue *q = l->wq; - - // Get thread ID by incrementing counter - pthread_mutex_lock(&l->threads.mutex); - - int thread_id = l->threads.worker_count; - l->threads.worker_count++; - - pthread_mutex_unlock(&l->threads.mutex); - - while (1) { - lnm_loop_conn *conn = lnm_loop_queue_pop(q); - lnm_ldebug("loop", "worker %i processing fd %i", thread_id, conn->fd); - - lnm_loop_conn_advance(l, conn); - lnm_loop_conn_schedule(l, conn); - } -}