From d53a9499466fffa70e202ea8d705c7ba9d86cc65 Mon Sep 17 00:00:00 2001 From: Chewing_Bever Date: Tue, 12 Dec 2023 22:22:55 +0100 Subject: [PATCH] feat(lnm): switch to epoll --- CHANGELOG.md | 2 + lnm/include/lnm/http/loop.h | 2 +- lnm/include/lnm/loop.h | 11 +- lnm/src/http/lnm_http_loop.c | 4 +- lnm/src/http/lnm_http_loop_process.c | 4 +- lnm/src/loop/lnm_loop.c | 182 +++++++++++++++------------ src/lander/lander_get.c | 5 +- src/main.c | 2 +- 8 files changed, 117 insertions(+), 95 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 92635f8..6a64beb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -18,6 +18,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 * Better API for adding routes * State machine HTTP loop * Automatically support HEAD requests for all GET requests + * Event loop uses `epoll` instead of `poll` + * Configurable multithreading using `epoll` * Landerctl * `-c` flag to use custom config file (useful for testing) diff --git a/lnm/include/lnm/http/loop.h b/lnm/include/lnm/http/loop.h index 071e22a..8e36caa 100644 --- a/lnm/include/lnm/http/loop.h +++ b/lnm/include/lnm/http/loop.h @@ -81,7 +81,7 @@ lnm_err lnm_http_route_init_regex(lnm_http_route **out, lnm_http_method method, */ lnm_err lnm_http_loop_route_add(lnm_http_loop *hl, lnm_http_route *route); -lnm_err lnm_http_loop_run(lnm_http_loop *hl, uint16_t port); +lnm_err lnm_http_loop_run(lnm_http_loop *hl, uint16_t port, int thread_count); void lnm_http_loop_set_api_key(lnm_http_loop *hl, const char *api_key); diff --git a/lnm/include/lnm/loop.h b/lnm/include/lnm/loop.h index 6e67f68..7ca372c 100644 --- a/lnm/include/lnm/loop.h +++ b/lnm/include/lnm/loop.h @@ -1,13 +1,13 @@ #ifndef LNM_LOOP #define LNM_LOOP +#include #include #include #include "lnm/common.h" #define LNM_LOOP_BUF_SIZE 2048 -#define LNM_LOOP_INITIAL_CONNS 16 typedef enum { lnm_loop_state_req = 0, @@ -32,11 +32,8 @@ typedef struct lnm_loop_conn { typedef struct lnm_loop { int listen_fd; - struct { - lnm_loop_conn **arr; - size_t len; - size_t open; - } conns; + int epoll_fd; + atomic_int open; void *gctx; lnm_err (*ctx_init)(void **out, void *gctx); void (*ctx_free)(void *ctx); @@ -52,6 +49,6 @@ lnm_err lnm_loop_init(lnm_loop **out, void *gctx, lnm_err lnm_loop_setup(lnm_loop *l, uint16_t port); -lnm_err lnm_loop_run(lnm_loop *l); +lnm_err lnm_loop_run(lnm_loop *l, int thread_count); #endif diff --git a/lnm/src/http/lnm_http_loop.c b/lnm/src/http/lnm_http_loop.c index c226b29..784126e 100644 --- a/lnm/src/http/lnm_http_loop.c +++ b/lnm/src/http/lnm_http_loop.c @@ -122,9 +122,9 @@ lnm_err lnm_http_loop_route_add(lnm_http_loop *hl, lnm_http_route *route) { return lnm_err_ok; } -lnm_err lnm_http_loop_run(lnm_http_loop *hl, uint16_t port) { +lnm_err lnm_http_loop_run(lnm_http_loop *hl, uint16_t port, int thread_count) { LNM_RES(lnm_loop_setup(hl, port)); - return lnm_loop_run(hl); + return lnm_loop_run(hl, thread_count); } void lnm_http_loop_set_api_key(lnm_http_loop *hl, const char *api_key) { diff --git a/lnm/src/http/lnm_http_loop_process.c b/lnm/src/http/lnm_http_loop_process.c index 62cea3f..63017f1 100644 --- a/lnm/src/http/lnm_http_loop_process.c +++ b/lnm/src/http/lnm_http_loop_process.c @@ -29,8 +29,8 @@ void lnm_http_loop_process_parse_req(lnm_http_conn *conn) { ctx->state = lnm_http_loop_state_route; lnm_linfo(section, "%s %.*s HTTP/1.%i", - lnm_http_method_names[ctx->req.method], (int)ctx->req.path.len, - ctx->req.path.s, ctx->req.minor_version); + lnm_http_method_names[ctx->req.method], (int)ctx->req.path.len, + ctx->req.path.s, ctx->req.minor_version); break; case lnm_http_parse_err_incomplete: // If the request is already the size of the read buffer, we close the diff --git a/lnm/src/loop/lnm_loop.c b/lnm/src/loop/lnm_loop.c index 4126eb0..6c9a854 100644 --- a/lnm/src/loop/lnm_loop.c +++ b/lnm/src/loop/lnm_loop.c @@ -1,12 +1,14 @@ #include #include -#include +#include #include #include +#include #include #include "lnm/common.h" #include "lnm/log.h" +#include "lnm/loop.h" #include "lnm/loop_internal.h" static const char *section = "loop"; @@ -47,35 +49,18 @@ lnm_err lnm_loop_accept(lnm_loop *l) { flags |= O_NONBLOCK; fcntl(conn_fd, F_SETFL, flags); - // Append connection to list of connections - if ((size_t)conn_fd >= l->conns.len) { - // We always calloc as a realloc might introduce unitialized values in the - // array - lnm_loop_conn **new = calloc(sizeof(lnm_loop_conn *), conn_fd + 1); - - if (new == NULL) { - close(conn_fd); - - return lnm_err_failed_alloc; - } - - if (l->conns.len > 0) { - memcpy(new, l->conns.arr, l->conns.len * sizeof(lnm_loop_conn *)); - free(l->conns.arr); - } - - l->conns.arr = new; - l->conns.len = conn_fd + 1; - } - lnm_loop_conn *conn; LNM_RES2(lnm_loop_conn_init(&conn, l), close(conn_fd)); - l->conns.arr[conn_fd] = conn; conn->fd = conn_fd; conn->state = lnm_loop_state_req; - l->conns.open++; + struct epoll_event event = {.data.ptr = conn, + .events = EPOLLIN | EPOLLET | EPOLLONESHOT}; + + epoll_ctl(l->epoll_fd, EPOLL_CTL_ADD, conn_fd, &event); + + l->open++; lnm_ldebug(section, "connection opened with fd %i", conn_fd); @@ -115,96 +100,133 @@ lnm_err lnm_loop_setup(lnm_loop *l, uint16_t port) { int flags = fcntl(listen_fd, F_GETFL); fcntl(listen_fd, F_SETFL, flags | O_NONBLOCK); + int epoll_fd = epoll_create1(0); + + if (epoll_fd < 0) { + return lnm_err_failed_network; + } + + struct epoll_event event = { + // The listening socket is marked using a NULL data field + .data.ptr = NULL, + .events = EPOLLIN | EPOLLET | EPOLLONESHOT}; + + res = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &event); + + if (res < 0) { + return lnm_err_failed_network; + } + l->listen_fd = listen_fd; + l->epoll_fd = epoll_fd; return lnm_err_ok; } -lnm_err lnm_loop_run(lnm_loop *l) { - if (l->listen_fd == 0) { - return lnm_err_not_setup; - } +typedef struct lnm_loop_thread_args { + lnm_loop *l; + int id; + int thread_count; +} lnm_loop_thread_args; - struct pollfd *poll_args = - calloc(LNM_LOOP_INITIAL_CONNS + 1, sizeof(struct pollfd)); - size_t poll_args_cap = LNM_LOOP_INITIAL_CONNS + 1; +lnm_err lnm_loop_run_thread(lnm_loop_thread_args *args) { + lnm_loop *l = args->l; + int thread_id = args->id; + int thread_count = args->thread_count; - if (poll_args == NULL) { + struct epoll_event *events = calloc(1, sizeof(struct epoll_event)); + int events_cap = 1; + + if (events == NULL) { return lnm_err_failed_alloc; } - // First argument is listening socket - poll_args[0].fd = l->listen_fd; - poll_args[0].events = POLLIN; + lnm_linfo(section, "thread %i started", thread_id); - lnm_linfo(section, "started on fd %i", l->listen_fd); + struct epoll_event listen_event = { + .data.ptr = NULL, .events = EPOLLIN | EPOLLET | EPOLLONESHOT}; while (1) { - nfds_t poll_args_len = 1; - - // Add all open connections to the poll command - for (size_t i = 0; i < l->conns.len && poll_args_len < l->conns.open + 1; - i++) { - const lnm_loop_conn *conn = l->conns.arr[i]; - - if (conn == NULL) { - continue; - } - - poll_args[poll_args_len].fd = conn->fd; - poll_args[poll_args_len].events = - ((conn->state == lnm_loop_state_req) ? POLLIN : POLLOUT) | POLLERR; - - poll_args_len++; - } - - int polled = poll(poll_args, poll_args_len, -1); + int polled = epoll_wait(l->epoll_fd, events, events_cap, -1); + lnm_ldebug(section, "polled (thread %i): %i", thread_id, polled); if (polled < 0) { return lnm_err_failed_poll; } - if (poll_args[0].revents) { - lnm_loop_accept(l); - polled--; - } - - for (size_t i = 1; i < poll_args_len && polled > 0; i++) { - if (poll_args[i].revents) { - lnm_loop_conn *conn = l->conns.arr[poll_args[i].fd]; + for (int i = 0; i < polled; i++) { + if (events[i].data.ptr == NULL) { + lnm_loop_accept(l); + epoll_ctl(l->epoll_fd, EPOLL_CTL_MOD, l->listen_fd, &listen_event); + } else { + lnm_loop_conn *conn = events[i].data.ptr; lnm_loop_conn_io(l, conn); if (conn->state == lnm_loop_state_end) { - l->conns.arr[conn->fd] = NULL; - close(conn->fd); - - l->conns.open--; - lnm_ldebug(section, "connection closed with fd %i", conn->fd); + int conn_fd = conn->fd; lnm_loop_conn_free(l, conn); - } + close(conn_fd); + l->open--; - polled--; + epoll_ctl(l->epoll_fd, EPOLL_CTL_DEL, conn_fd, NULL); + + lnm_ldebug(section, "connection closed with fd %i", conn_fd); + } else { + struct epoll_event event = { + .data.ptr = conn, + .events = + (conn->state == lnm_loop_state_req ? EPOLLIN : EPOLLOUT) | + EPOLLET | EPOLLONESHOT}; + epoll_ctl(l->epoll_fd, EPOLL_CTL_MOD, conn->fd, &event); + } } } - if (poll_args_cap < l->conns.open + 1) { - struct pollfd *buf = calloc(l->conns.open + 1, sizeof(struct pollfd)); + int open = l->open; + int cap_per_thread = + open + 1 > thread_count ? (open + 1) / thread_count : 1; - if (buf == NULL) { + if (cap_per_thread > events_cap) { + struct epoll_event *new_events = + malloc(cap_per_thread * sizeof(struct epoll_event)); + + if (new_events == NULL) { return lnm_err_failed_alloc; } - buf[0].fd = l->listen_fd; - buf[0].events = POLLIN; - - free(poll_args); - poll_args = buf; - - poll_args_cap = l->conns.open + 1; + free(events); + events = new_events; + events_cap = cap_per_thread; } } return lnm_err_ok; } + +lnm_err lnm_loop_run(lnm_loop *l, int thread_count) { + if (l->epoll_fd == 0) { + return lnm_err_not_setup; + } + + lnm_loop_thread_args args[thread_count]; + + for (int i = 1; i < thread_count; i++) { + args[i].l = l; + args[i].id = i; + args[i].thread_count = thread_count; + + pthread_t thread; + pthread_create(&thread, NULL, (void *(*)(void *))lnm_loop_run_thread, + &args[i]); + } + + args[0].l = l; + args[0].id = 0; + args[0].thread_count = thread_count; + + lnm_loop_run_thread(&args[0]); + + return lnm_err_ok; +} diff --git a/src/lander/lander_get.c b/src/lander/lander_get.c index 24e93ba..0e90086 100644 --- a/src/lander/lander_get.c +++ b/src/lander/lander_get.c @@ -3,8 +3,8 @@ #include "lnm/http/consts.h" #include "lnm/http/loop.h" -#include "lnm/loop.h" #include "lnm/log.h" +#include "lnm/loop.h" #include "lsm/store.h" #include "lander.h" @@ -40,7 +40,8 @@ lnm_http_step_err lander_get_redirect(lnm_http_conn *conn) { // This shouldn't be able to happen if (lsm_entry_attr_get(&url_attr_val, c_ctx->entry, lander_attr_type_url) != lsm_error_ok) { - lnm_lerror("lander", "%s", "Entry of type redirect detected without URL attribute"); + lnm_lerror("lander", "%s", + "Entry of type redirect detected without URL attribute"); ctx->res.status = lnm_http_status_internal_server_error; lsm_entry_close(c_ctx->entry); diff --git a/src/main.c b/src/main.c index d25aec3..6cacf84 100644 --- a/src/main.c +++ b/src/main.c @@ -104,7 +104,7 @@ int main() { lnm_linfo("main", "Store loaded containing %lu entries", lsm_store_size(c_gctx->store)); lnm_http_loop *hl = loop_init(c_gctx, api_key); - lnm_http_loop_run(hl, port); + lnm_http_loop_run(hl, port, 1); /* http_loop *hl = http_loop_init( */ /* lander_routes, sizeof(lander_routes) / sizeof(lander_routes[0]),