From 3490c2dd4202e12702526d049858e48d909a3832 Mon Sep 17 00:00:00 2001 From: Chewing_Bever Date: Thu, 1 Feb 2024 11:15:34 +0100 Subject: [PATCH 1/7] feat: implement concurrent worker queue --- include/lnm/loop.h | 32 ++++++++++++++++++++ src/loop/lnm_loop_worker.c | 61 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 93 insertions(+) create mode 100644 src/loop/lnm_loop_worker.c diff --git a/include/lnm/loop.h b/include/lnm/loop.h index 7ca372c..4c33aa5 100644 --- a/include/lnm/loop.h +++ b/include/lnm/loop.h @@ -1,6 +1,7 @@ #ifndef LNM_LOOP #define LNM_LOOP +#include #include #include #include @@ -51,4 +52,35 @@ lnm_err lnm_loop_setup(lnm_loop *l, uint16_t port); lnm_err lnm_loop_run(lnm_loop *l, int thread_count); +/** + * Concurrent fixed-size queue used by the worked threads to distribute tasks + */ +typedef struct lnm_loop_queue { + struct { + lnm_loop_conn **arr; + size_t len; + } buf; + size_t head; + size_t tail; + pthread_mutex_t mutex; + pthread_cond_t cond; +} lnm_loop_queue; + +/** + * Initialize a new queue with the specified fixed capacity + */ +lnm_err lnm_loop_queue_init(lnm_loop_queue **out, size_t cap); + +/** + * Queue the given connection. If the queue is currently full, this action + * blocks until there is space available. + */ +void lnm_loop_queue_push(lnm_loop_queue *q, lnm_loop_conn *conn); + +/** + * Pop a connection from the queue. This action blocks until a connection is + * available. + */ +lnm_loop_conn *lnm_loop_queue_pop(lnm_loop_queue *q); + #endif diff --git a/src/loop/lnm_loop_worker.c b/src/loop/lnm_loop_worker.c new file mode 100644 index 0000000..067f1e8 --- /dev/null +++ b/src/loop/lnm_loop_worker.c @@ -0,0 +1,61 @@ +#include "lnm/loop.h" + +lnm_err lnm_loop_queue_init(lnm_loop_queue **out, size_t cap) { + lnm_loop_conn **arr = malloc(cap * sizeof(lnm_loop_conn *)); + + if (arr == NULL) { + return lnm_err_failed_alloc; + } + + lnm_loop_queue *q = calloc(1, sizeof(lnm_loop_queue)); + + if (q == NULL) { + free(arr); + + return lnm_err_failed_alloc; + } + + q->buf.arr = arr; + q->buf.len = cap; + + pthread_mutex_init(&q->mutex, NULL); + pthread_cond_init(&q->cond, NULL); + + *out = q; + + return lnm_err_ok; +} + +void lnm_loop_queue_push(lnm_loop_queue *q, lnm_loop_conn *conn) { + pthread_mutex_lock(&q->mutex); + + while (q->tail == q->head) { + pthread_cond_wait(&q->cond, &q->mutex); + } + + q->buf.arr[q->tail] = conn; + + // Make sure the index wraps around + q->tail = (q->tail + 1) % q->buf.len; + + // Unlock mutex and signal to waiting threads + pthread_mutex_unlock(&q->mutex); + pthread_cond_signal(&q->cond); +} + +lnm_loop_conn *lnm_loop_queue_pop(lnm_loop_queue *q) { + pthread_mutex_lock(&q->mutex); + + while (q->tail == q->head) { + pthread_cond_wait(&q->cond, &q->mutex); + } + + lnm_loop_conn *out = q->buf.arr[q->head]; + q->head = (q->head + 1) % q->buf.len; + + // Unlock mutex and signal to waiting threads + pthread_mutex_unlock(&q->mutex); + pthread_cond_signal(&q->cond); + + return out; +} From ccb9b77cc854540ef3279f0c3af9fb704e3c7efa Mon Sep 17 00:00:00 2001 From: Chewing_Bever Date: Thu, 1 Feb 2024 14:00:03 +0100 Subject: [PATCH 2/7] feat: further lay groundwork for worker threads --- include/lnm/loop.h | 63 ++++++++++++++++++++++++-------------- src/loop/lnm_loop.c | 61 +++++++++++++++++++++--------------- src/loop/lnm_loop_worker.c | 23 ++++++++++++++ 3 files changed, 100 insertions(+), 47 deletions(-) diff --git a/include/lnm/loop.h b/include/lnm/loop.h index 4c33aa5..4f98349 100644 --- a/include/lnm/loop.h +++ b/include/lnm/loop.h @@ -10,12 +10,17 @@ #define LNM_LOOP_BUF_SIZE 2048 -typedef enum { +typedef enum lnm_loop_state { lnm_loop_state_req = 0, lnm_loop_state_res, lnm_loop_state_end, + lnm_loop_state_req_blocking, + lnm_loop_state_res_blocking, } lnm_loop_state; +/** + * State for a currently active connection + */ typedef struct lnm_loop_conn { int fd; lnm_loop_state state; @@ -31,29 +36,8 @@ typedef struct lnm_loop_conn { } w; } lnm_loop_conn; -typedef struct lnm_loop { - int listen_fd; - int epoll_fd; - atomic_int open; - void *gctx; - lnm_err (*ctx_init)(void **out, void *gctx); - void (*ctx_free)(void *ctx); - void (*data_read)(lnm_loop_conn *conn); - void (*data_write)(lnm_loop_conn *conn); -} lnm_loop; - -lnm_err lnm_loop_init(lnm_loop **out, void *gctx, - lnm_err (*ctx_init)(void **out, void *gctx), - void (*ctx_free)(void *ctx), - void (*data_read)(lnm_loop_conn *conn), - void (*data_write)(lnm_loop_conn *conn)); - -lnm_err lnm_loop_setup(lnm_loop *l, uint16_t port); - -lnm_err lnm_loop_run(lnm_loop *l, int thread_count); - /** - * Concurrent fixed-size queue used by the worked threads to distribute tasks + * Concurrent fixed-size queue used to distribute work among worker threads */ typedef struct lnm_loop_queue { struct { @@ -83,4 +67,37 @@ void lnm_loop_queue_push(lnm_loop_queue *q, lnm_loop_conn *conn); */ lnm_loop_conn *lnm_loop_queue_pop(lnm_loop_queue *q); +typedef struct lnm_loop { + int listen_fd; + int epoll_fd; + atomic_int open; + void *gctx; + lnm_err (*ctx_init)(void **out, void *gctx); + void (*ctx_free)(void *ctx); + void (*data_read)(lnm_loop_conn *conn); + void (*data_write)(lnm_loop_conn *conn); + lnm_loop_queue *wq; +} lnm_loop; + +lnm_err lnm_loop_init(lnm_loop **out, void *gctx, + lnm_err (*ctx_init)(void **out, void *gctx), + void (*ctx_free)(void *ctx), + void (*data_read)(lnm_loop_conn *conn), + void (*data_write)(lnm_loop_conn *conn)); + +lnm_err lnm_loop_setup(lnm_loop *l, uint16_t port); + +lnm_err lnm_loop_run(lnm_loop *l, int thread_count); + +/** + * Reschedule the given connection, either on the event loop for network IO or + * on a worker thread for blocking work. Connections are terminated as needed. + */ +void lnm_loop_conn_schedule(lnm_loop *l, lnm_loop_conn *conn); + +/** + * Main loop executed on the worker threads. + */ +void lnm_loop_worker_run(void *arg); + #endif diff --git a/src/loop/lnm_loop.c b/src/loop/lnm_loop.c index 6c9a854..5379ecc 100644 --- a/src/loop/lnm_loop.c +++ b/src/loop/lnm_loop.c @@ -62,6 +62,10 @@ lnm_err lnm_loop_accept(lnm_loop *l) { l->open++; + // Make sure to re-arm the listening socket after accepting + event.data.ptr = NULL; + epoll_ctl(l->epoll_fd, EPOLL_CTL_MOD, l->listen_fd, &event); + lnm_ldebug(section, "connection opened with fd %i", conn_fd); return lnm_err_ok; @@ -123,6 +127,36 @@ lnm_err lnm_loop_setup(lnm_loop *l, uint16_t port) { return lnm_err_ok; } +void lnm_loop_conn_schedule(lnm_loop *l, lnm_loop_conn *conn) { + switch (conn->state) { + // IO states get rescheduled in the epoll loop + case lnm_loop_state_req: + case lnm_loop_state_res: { + struct epoll_event event = { + .data.ptr = conn, + .events = (conn->state == lnm_loop_state_req ? EPOLLIN : EPOLLOUT) | + EPOLLET | EPOLLONESHOT}; + + epoll_ctl(l->epoll_fd, EPOLL_CTL_MOD, conn->fd, &event); + } break; + case lnm_loop_state_req_blocking: + case lnm_loop_state_res_blocking: + lnm_loop_queue_push(l->wq, conn); + break; + case lnm_loop_state_end: { + int conn_fd = conn->fd; + + lnm_loop_conn_free(l, conn); + close(conn_fd); + l->open--; + + epoll_ctl(l->epoll_fd, EPOLL_CTL_DEL, conn_fd, NULL); + + lnm_ldebug(section, "connection closed with fd %i", conn_fd); + } break; + } +} + typedef struct lnm_loop_thread_args { lnm_loop *l; int id; @@ -143,9 +177,6 @@ lnm_err lnm_loop_run_thread(lnm_loop_thread_args *args) { lnm_linfo(section, "thread %i started", thread_id); - struct epoll_event listen_event = { - .data.ptr = NULL, .events = EPOLLIN | EPOLLET | EPOLLONESHOT}; - while (1) { int polled = epoll_wait(l->epoll_fd, events, events_cap, -1); lnm_ldebug(section, "polled (thread %i): %i", thread_id, polled); @@ -157,30 +188,12 @@ lnm_err lnm_loop_run_thread(lnm_loop_thread_args *args) { for (int i = 0; i < polled; i++) { if (events[i].data.ptr == NULL) { lnm_loop_accept(l); - - epoll_ctl(l->epoll_fd, EPOLL_CTL_MOD, l->listen_fd, &listen_event); } else { lnm_loop_conn *conn = events[i].data.ptr; + + // At this point, state is always an IO state lnm_loop_conn_io(l, conn); - - if (conn->state == lnm_loop_state_end) { - int conn_fd = conn->fd; - - lnm_loop_conn_free(l, conn); - close(conn_fd); - l->open--; - - epoll_ctl(l->epoll_fd, EPOLL_CTL_DEL, conn_fd, NULL); - - lnm_ldebug(section, "connection closed with fd %i", conn_fd); - } else { - struct epoll_event event = { - .data.ptr = conn, - .events = - (conn->state == lnm_loop_state_req ? EPOLLIN : EPOLLOUT) | - EPOLLET | EPOLLONESHOT}; - epoll_ctl(l->epoll_fd, EPOLL_CTL_MOD, conn->fd, &event); - } + lnm_loop_conn_schedule(l, conn); } } diff --git a/src/loop/lnm_loop_worker.c b/src/loop/lnm_loop_worker.c index 067f1e8..2f315cb 100644 --- a/src/loop/lnm_loop_worker.c +++ b/src/loop/lnm_loop_worker.c @@ -1,3 +1,5 @@ +#include + #include "lnm/loop.h" lnm_err lnm_loop_queue_init(lnm_loop_queue **out, size_t cap) { @@ -59,3 +61,24 @@ lnm_loop_conn *lnm_loop_queue_pop(lnm_loop_queue *q) { return out; } + +void lnm_loop_worker_run(void *arg) { + lnm_loop *l = arg; + lnm_loop_queue *q = l->wq; + + while (1) { + lnm_loop_conn *conn = lnm_loop_queue_pop(q); + + switch (conn->state) { + case lnm_loop_state_req_blocking: + l->data_read(conn); + break; + case lnm_loop_state_res_blocking: + l->data_write(conn); + // Other states shouldn't even end up here, so we ignore them + default:; + } + + lnm_loop_conn_schedule(l, conn); + } +} From 9c01548256a38c53a9ae7a8827be08ee5ae76a89 Mon Sep 17 00:00:00 2001 From: Chewing_Bever Date: Thu, 1 Feb 2024 15:37:35 +0100 Subject: [PATCH 3/7] feat: change lnm loop multithreading api --- example/blocking.c | 2 +- include/lnm/http/loop.h | 3 +- include/lnm/loop.h | 18 +++++++++++- src/http/lnm_http_loop.c | 5 ++-- src/loop/lnm_loop.c | 60 +++++++++++++++++++--------------------- 5 files changed, 52 insertions(+), 36 deletions(-) diff --git a/example/blocking.c b/example/blocking.c index 9b395b4..5d58451 100644 --- a/example/blocking.c +++ b/example/blocking.c @@ -34,5 +34,5 @@ int main() { lnm_log_init_global(); lnm_log_register_stdout(lnm_log_level_debug); - lnm_http_loop_run(hl, 8080, 1); + printf("res = %i\n", lnm_http_loop_run(hl, 8080, 1, 1)); } diff --git a/include/lnm/http/loop.h b/include/lnm/http/loop.h index 8e36caa..2427179 100644 --- a/include/lnm/http/loop.h +++ b/include/lnm/http/loop.h @@ -81,7 +81,8 @@ lnm_err lnm_http_route_init_regex(lnm_http_route **out, lnm_http_method method, */ lnm_err lnm_http_loop_route_add(lnm_http_loop *hl, lnm_http_route *route); -lnm_err lnm_http_loop_run(lnm_http_loop *hl, uint16_t port, int thread_count); +lnm_err lnm_http_loop_run(lnm_http_loop *hl, uint16_t port, + size_t epoll_threads, size_t worker_threads); void lnm_http_loop_set_api_key(lnm_http_loop *hl, const char *api_key); diff --git a/include/lnm/loop.h b/include/lnm/loop.h index 4f98349..4570ca1 100644 --- a/include/lnm/loop.h +++ b/include/lnm/loop.h @@ -9,6 +9,7 @@ #include "lnm/common.h" #define LNM_LOOP_BUF_SIZE 2048 +#define LNM_QUEUE_MULTIPLIER 8 typedef enum lnm_loop_state { lnm_loop_state_req = 0, @@ -77,6 +78,12 @@ typedef struct lnm_loop { void (*data_read)(lnm_loop_conn *conn); void (*data_write)(lnm_loop_conn *conn); lnm_loop_queue *wq; + struct { + // Mutex shared between all threads; used for counting thread IDs + pthread_mutex_t mutex; + size_t worker_count; + size_t epoll_count; + } threads; } lnm_loop; lnm_err lnm_loop_init(lnm_loop **out, void *gctx, @@ -87,7 +94,16 @@ lnm_err lnm_loop_init(lnm_loop **out, void *gctx, lnm_err lnm_loop_setup(lnm_loop *l, uint16_t port); -lnm_err lnm_loop_run(lnm_loop *l, int thread_count); +/** + * Run a single epoll thread of the event loop. + */ +lnm_err lnm_loop_run(lnm_loop *l); + +/** + * Run a multithreaded event loop with the configured number of threads. + */ +lnm_err lnm_loop_run_multi(lnm_loop *l, size_t epoll_threads, + size_t worker_threads); /** * Reschedule the given connection, either on the event loop for network IO or diff --git a/src/http/lnm_http_loop.c b/src/http/lnm_http_loop.c index 784126e..153b88c 100644 --- a/src/http/lnm_http_loop.c +++ b/src/http/lnm_http_loop.c @@ -122,9 +122,10 @@ lnm_err lnm_http_loop_route_add(lnm_http_loop *hl, lnm_http_route *route) { return lnm_err_ok; } -lnm_err lnm_http_loop_run(lnm_http_loop *hl, uint16_t port, int thread_count) { +lnm_err lnm_http_loop_run(lnm_http_loop *hl, uint16_t port, + size_t epoll_threads, size_t worker_threads) { LNM_RES(lnm_loop_setup(hl, port)); - return lnm_loop_run(hl, thread_count); + return lnm_loop_run_multi(hl, epoll_threads, worker_threads); } void lnm_http_loop_set_api_key(lnm_http_loop *hl, const char *api_key) { diff --git a/src/loop/lnm_loop.c b/src/loop/lnm_loop.c index 5379ecc..ac6e5eb 100644 --- a/src/loop/lnm_loop.c +++ b/src/loop/lnm_loop.c @@ -30,6 +30,8 @@ lnm_err lnm_loop_init(lnm_loop **out, void *gctx, l->data_read = data_read; l->data_write = data_write; + pthread_mutex_init(&l->threads.mutex, NULL); + *out = l; return lnm_err_ok; @@ -157,19 +159,21 @@ void lnm_loop_conn_schedule(lnm_loop *l, lnm_loop_conn *conn) { } } -typedef struct lnm_loop_thread_args { - lnm_loop *l; - int id; - int thread_count; -} lnm_loop_thread_args; +lnm_err lnm_loop_run(lnm_loop *l) { + if (l->epoll_fd == 0) { + return lnm_err_not_setup; + } -lnm_err lnm_loop_run_thread(lnm_loop_thread_args *args) { - lnm_loop *l = args->l; - int thread_id = args->id; - int thread_count = args->thread_count; + // Get thread ID by incrementing counter + pthread_mutex_lock(&l->threads.mutex); + + int thread_id = l->threads.epoll_count; + l->threads.epoll_count++; + + pthread_mutex_unlock(&l->threads.mutex); struct epoll_event *events = calloc(1, sizeof(struct epoll_event)); - int events_cap = 1; + size_t events_cap = 1; if (events == NULL) { return lnm_err_failed_alloc; @@ -197,9 +201,10 @@ lnm_err lnm_loop_run_thread(lnm_loop_thread_args *args) { } } - int open = l->open; - int cap_per_thread = - open + 1 > thread_count ? (open + 1) / thread_count : 1; + size_t open = l->open; + size_t cap_per_thread = open + 1 > l->threads.epoll_count + ? (open + 1) / l->threads.epoll_count + : 1; if (cap_per_thread > events_cap) { struct epoll_event *new_events = @@ -218,28 +223,21 @@ lnm_err lnm_loop_run_thread(lnm_loop_thread_args *args) { return lnm_err_ok; } -lnm_err lnm_loop_run(lnm_loop *l, int thread_count) { - if (l->epoll_fd == 0) { - return lnm_err_not_setup; +lnm_err lnm_loop_run_multi(lnm_loop *l, size_t epoll_threads, + size_t worker_threads) { + if (worker_threads > 0) { + LNM_RES(lnm_loop_queue_init(&l->wq, LNM_QUEUE_MULTIPLIER * worker_threads)); } - lnm_loop_thread_args args[thread_count]; + pthread_t t; - for (int i = 1; i < thread_count; i++) { - args[i].l = l; - args[i].id = i; - args[i].thread_count = thread_count; - - pthread_t thread; - pthread_create(&thread, NULL, (void *(*)(void *))lnm_loop_run_thread, - &args[i]); + for (size_t i = 1; i < epoll_threads; i++) { + pthread_create(&t, NULL, (void *(*)(void *))lnm_loop_run, l); } - args[0].l = l; - args[0].id = 0; - args[0].thread_count = thread_count; + for (size_t i = 0; i < worker_threads; i++) { + pthread_create(&t, NULL, (void *(*)(void *))lnm_loop_worker_run, l); + } - lnm_loop_run_thread(&args[0]); - - return lnm_err_ok; + return lnm_loop_run(l); } From a96ae3952388ed24f4ea8e474382b8f88bafbadf Mon Sep 17 00:00:00 2001 From: Chewing_Bever Date: Thu, 1 Feb 2024 18:42:29 +0100 Subject: [PATCH 4/7] feat: add blocking step support to http loop --- example/blocking.c | 2 +- include/lnm/loop.h | 1 + src/_include/lnm/http/loop_internal.h | 1 + src/http/lnm_http_loop_process.c | 9 ++++++++ src/loop/lnm_loop_worker.c | 31 +++++++++++++++++++++------ 5 files changed, 36 insertions(+), 8 deletions(-) diff --git a/example/blocking.c b/example/blocking.c index 5d58451..557bc87 100644 --- a/example/blocking.c +++ b/example/blocking.c @@ -34,5 +34,5 @@ int main() { lnm_log_init_global(); lnm_log_register_stdout(lnm_log_level_debug); - printf("res = %i\n", lnm_http_loop_run(hl, 8080, 1, 1)); + printf("res = %i\n", lnm_http_loop_run(hl, 8080, 1, 32)); } diff --git a/include/lnm/loop.h b/include/lnm/loop.h index 4570ca1..781a745 100644 --- a/include/lnm/loop.h +++ b/include/lnm/loop.h @@ -47,6 +47,7 @@ typedef struct lnm_loop_queue { } buf; size_t head; size_t tail; + bool empty; pthread_mutex_t mutex; pthread_cond_t cond; } lnm_loop_queue; diff --git a/src/_include/lnm/http/loop_internal.h b/src/_include/lnm/http/loop_internal.h index 4d9e865..a343de6 100644 --- a/src/_include/lnm/http/loop_internal.h +++ b/src/_include/lnm/http/loop_internal.h @@ -8,6 +8,7 @@ typedef struct lnm_http_step { lnm_http_step_fn fn; struct lnm_http_step *next; + bool blocking; } lnm_http_step; typedef enum lnm_http_route_type { diff --git a/src/http/lnm_http_loop_process.c b/src/http/lnm_http_loop_process.c index e16e18c..40f7180 100644 --- a/src/http/lnm_http_loop_process.c +++ b/src/http/lnm_http_loop_process.c @@ -133,11 +133,19 @@ void lnm_http_loop_process_steps(lnm_http_conn *conn) { while ((ctx->cur_step != NULL) && (step != ctx->cur_step)) { step = ctx->cur_step; + if (step->blocking && (conn->state != lnm_loop_state_req_blocking)) { + conn->state = lnm_loop_state_req_blocking; + + break; + } + switch (step->fn(conn)) { case lnm_http_step_err_done: ctx->cur_step = ctx->cur_step->next; break; case lnm_http_step_err_io_needed: + // Ensure steps that require more I/O are executed on the event loop + conn->state = lnm_loop_state_req; break; case lnm_http_step_err_close: conn->state = lnm_loop_state_end; @@ -149,6 +157,7 @@ void lnm_http_loop_process_steps(lnm_http_conn *conn) { } if (ctx->cur_step == NULL) { + conn->state = lnm_loop_state_res; ctx->state = lnm_http_loop_state_add_headers; } } diff --git a/src/loop/lnm_loop_worker.c b/src/loop/lnm_loop_worker.c index 2f315cb..5e9d199 100644 --- a/src/loop/lnm_loop_worker.c +++ b/src/loop/lnm_loop_worker.c @@ -1,9 +1,10 @@ #include +#include "lnm/log.h" #include "lnm/loop.h" lnm_err lnm_loop_queue_init(lnm_loop_queue **out, size_t cap) { - lnm_loop_conn **arr = malloc(cap * sizeof(lnm_loop_conn *)); + lnm_loop_conn **arr = calloc(cap, sizeof(lnm_loop_conn *)); if (arr == NULL) { return lnm_err_failed_alloc; @@ -20,6 +21,10 @@ lnm_err lnm_loop_queue_init(lnm_loop_queue **out, size_t cap) { q->buf.arr = arr; q->buf.len = cap; + q->tail = 0; + q->head = 0; + q->empty = true; + pthread_mutex_init(&q->mutex, NULL); pthread_cond_init(&q->cond, NULL); @@ -31,14 +36,15 @@ lnm_err lnm_loop_queue_init(lnm_loop_queue **out, size_t cap) { void lnm_loop_queue_push(lnm_loop_queue *q, lnm_loop_conn *conn) { pthread_mutex_lock(&q->mutex); - while (q->tail == q->head) { + while (q->head == q->tail && !q->empty) { pthread_cond_wait(&q->cond, &q->mutex); } - q->buf.arr[q->tail] = conn; + q->buf.arr[q->head] = conn; // Make sure the index wraps around - q->tail = (q->tail + 1) % q->buf.len; + q->head = (q->head + 1) % q->buf.len; + q->empty = false; // Unlock mutex and signal to waiting threads pthread_mutex_unlock(&q->mutex); @@ -48,12 +54,14 @@ void lnm_loop_queue_push(lnm_loop_queue *q, lnm_loop_conn *conn) { lnm_loop_conn *lnm_loop_queue_pop(lnm_loop_queue *q) { pthread_mutex_lock(&q->mutex); - while (q->tail == q->head) { + while (q->empty) { pthread_cond_wait(&q->cond, &q->mutex); } - lnm_loop_conn *out = q->buf.arr[q->head]; - q->head = (q->head + 1) % q->buf.len; + lnm_loop_conn *out = q->buf.arr[q->tail]; + + q->tail = (q->tail + 1) % q->buf.len; + q->empty = q->tail == q->head; // Unlock mutex and signal to waiting threads pthread_mutex_unlock(&q->mutex); @@ -66,8 +74,17 @@ void lnm_loop_worker_run(void *arg) { lnm_loop *l = arg; lnm_loop_queue *q = l->wq; + // Get thread ID by incrementing counter + pthread_mutex_lock(&l->threads.mutex); + + int thread_id = l->threads.worker_count; + l->threads.worker_count++; + + pthread_mutex_unlock(&l->threads.mutex); + while (1) { lnm_loop_conn *conn = lnm_loop_queue_pop(q); + lnm_ldebug("loop", "worker %i processing fd %i", thread_id, conn->fd); switch (conn->state) { case lnm_loop_state_req_blocking: From 64601b5f21d11cdc9391ad6a32fec6976b900613 Mon Sep 17 00:00:00 2001 From: Chewing_Bever Date: Thu, 1 Feb 2024 18:51:05 +0100 Subject: [PATCH 5/7] fix: use send instead of write to prevent SIGPIPE --- src/loop/lnm_loop_io.c | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/loop/lnm_loop_io.c b/src/loop/lnm_loop_io.c index 363d6af..34da2d9 100644 --- a/src/loop/lnm_loop_io.c +++ b/src/loop/lnm_loop_io.c @@ -1,6 +1,7 @@ #include #include #include +#include #include "lnm/loop.h" #include "lnm/loop_internal.h" @@ -43,7 +44,9 @@ void lnm_loop_conn_io_res(lnm_loop *l, lnm_loop_conn *conn) { ssize_t res; do { - res = write(conn->fd, conn->w.buf, conn->w.size); + // Send with MSG_NOSIGNAL prevents closed pipes from exiting the program + // with SIGPIPE + res = send(conn->fd, conn->w.buf, conn->w.size, MSG_NOSIGNAL); } while (res < 0 && errno == EINTR); // Write can't be performed without blocking; we come back later From 1b8ba305b5bc080249401894e3a552efa077d781 Mon Sep 17 00:00:00 2001 From: Chewing_Bever Date: Wed, 14 Feb 2024 12:07:14 +0100 Subject: [PATCH 6/7] chore: rename event loop states --- include/lnm/loop.h | 8 ++++---- src/http/lnm_http_loop_process.c | 28 ++++++++++++++-------------- src/loop/lnm_loop.c | 12 ++++++------ src/loop/lnm_loop_io.c | 10 +++++----- src/loop/lnm_loop_worker.c | 4 ++-- 5 files changed, 31 insertions(+), 31 deletions(-) diff --git a/include/lnm/loop.h b/include/lnm/loop.h index 781a745..09041bc 100644 --- a/include/lnm/loop.h +++ b/include/lnm/loop.h @@ -12,11 +12,11 @@ #define LNM_QUEUE_MULTIPLIER 8 typedef enum lnm_loop_state { - lnm_loop_state_req = 0, - lnm_loop_state_res, + lnm_loop_state_req_io = 0, + lnm_loop_state_res_io, lnm_loop_state_end, - lnm_loop_state_req_blocking, - lnm_loop_state_res_blocking, + lnm_loop_state_req_work, + lnm_loop_state_res_work, } lnm_loop_state; /** diff --git a/src/http/lnm_http_loop_process.c b/src/http/lnm_http_loop_process.c index 40f7180..1e465cf 100644 --- a/src/http/lnm_http_loop_process.c +++ b/src/http/lnm_http_loop_process.c @@ -133,8 +133,8 @@ void lnm_http_loop_process_steps(lnm_http_conn *conn) { while ((ctx->cur_step != NULL) && (step != ctx->cur_step)) { step = ctx->cur_step; - if (step->blocking && (conn->state != lnm_loop_state_req_blocking)) { - conn->state = lnm_loop_state_req_blocking; + if (step->blocking && (conn->state != lnm_loop_state_req_work)) { + conn->state = lnm_loop_state_req_work; break; } @@ -145,7 +145,7 @@ void lnm_http_loop_process_steps(lnm_http_conn *conn) { break; case lnm_http_step_err_io_needed: // Ensure steps that require more I/O are executed on the event loop - conn->state = lnm_loop_state_req; + conn->state = lnm_loop_state_req_io; break; case lnm_http_step_err_close: conn->state = lnm_loop_state_end; @@ -157,7 +157,7 @@ void lnm_http_loop_process_steps(lnm_http_conn *conn) { } if (ctx->cur_step == NULL) { - conn->state = lnm_loop_state_res; + conn->state = lnm_loop_state_res_io; ctx->state = lnm_http_loop_state_add_headers; } } @@ -334,23 +334,23 @@ void (*process_fns[])(lnm_http_conn *conn) = { lnm_loop_state state_map[] = { // parse_req - lnm_loop_state_req, + lnm_loop_state_req_io, // route - lnm_loop_state_req, + lnm_loop_state_req_io, // parse_headers - lnm_loop_state_req, + lnm_loop_state_req_io, // steps - lnm_loop_state_req, + lnm_loop_state_req_io, // add_headers - lnm_loop_state_req, + lnm_loop_state_req_io, // write_status_line - lnm_loop_state_res, + lnm_loop_state_res_io, // write_headers - lnm_loop_state_res, + lnm_loop_state_res_io, // write_body - lnm_loop_state_res, + lnm_loop_state_res_io, // finish - lnm_loop_state_res, + lnm_loop_state_res_io, }; void lnm_http_loop_process(lnm_http_conn *conn) { @@ -378,7 +378,7 @@ void lnm_http_loop_process(lnm_http_conn *conn) { // We move the request to a dedicated buffer if the read buffer needs to be // reused - if ((conn->state == lnm_loop_state_req) && (conn->state == loop_state) && + if ((conn->state == lnm_loop_state_req_io) && (conn->state == loop_state) && (!ctx->req.buf.owned) && (ctx->req.buf.len > 0)) { char *buf = malloc(ctx->req.buf.len); diff --git a/src/loop/lnm_loop.c b/src/loop/lnm_loop.c index ac6e5eb..de74902 100644 --- a/src/loop/lnm_loop.c +++ b/src/loop/lnm_loop.c @@ -55,7 +55,7 @@ lnm_err lnm_loop_accept(lnm_loop *l) { LNM_RES2(lnm_loop_conn_init(&conn, l), close(conn_fd)); conn->fd = conn_fd; - conn->state = lnm_loop_state_req; + conn->state = lnm_loop_state_req_io; struct epoll_event event = {.data.ptr = conn, .events = EPOLLIN | EPOLLET | EPOLLONESHOT}; @@ -132,17 +132,17 @@ lnm_err lnm_loop_setup(lnm_loop *l, uint16_t port) { void lnm_loop_conn_schedule(lnm_loop *l, lnm_loop_conn *conn) { switch (conn->state) { // IO states get rescheduled in the epoll loop - case lnm_loop_state_req: - case lnm_loop_state_res: { + case lnm_loop_state_req_io: + case lnm_loop_state_res_io: { struct epoll_event event = { .data.ptr = conn, - .events = (conn->state == lnm_loop_state_req ? EPOLLIN : EPOLLOUT) | + .events = (conn->state == lnm_loop_state_req_io ? EPOLLIN : EPOLLOUT) | EPOLLET | EPOLLONESHOT}; epoll_ctl(l->epoll_fd, EPOLL_CTL_MOD, conn->fd, &event); } break; - case lnm_loop_state_req_blocking: - case lnm_loop_state_res_blocking: + case lnm_loop_state_req_work: + case lnm_loop_state_res_work: lnm_loop_queue_push(l->wq, conn); break; case lnm_loop_state_end: { diff --git a/src/loop/lnm_loop_io.c b/src/loop/lnm_loop_io.c index 34da2d9..15fba24 100644 --- a/src/loop/lnm_loop_io.c +++ b/src/loop/lnm_loop_io.c @@ -1,7 +1,7 @@ #include #include -#include #include +#include #include "lnm/loop.h" #include "lnm/loop_internal.h" @@ -34,7 +34,7 @@ void lnm_loop_conn_io_req(lnm_loop *l, lnm_loop_conn *conn) { conn->r.size += res; l->data_read(conn); - } while (conn->state == lnm_loop_state_req); + } while (conn->state == lnm_loop_state_req_io); } void lnm_loop_conn_io_res(lnm_loop *l, lnm_loop_conn *conn) { @@ -64,15 +64,15 @@ void lnm_loop_conn_io_res(lnm_loop *l, lnm_loop_conn *conn) { // writer function more space to work with memmove(conn->w.buf, &conn->w.buf[res], conn->w.size - res); conn->w.size -= res; - } while (conn->state == lnm_loop_state_res); + } while (conn->state == lnm_loop_state_res_io); } void lnm_loop_conn_io(lnm_loop *l, lnm_loop_conn *conn) { switch (conn->state) { - case lnm_loop_state_req: + case lnm_loop_state_req_io: lnm_loop_conn_io_req(l, conn); break; - case lnm_loop_state_res: + case lnm_loop_state_res_io: lnm_loop_conn_io_res(l, conn); break; default:; diff --git a/src/loop/lnm_loop_worker.c b/src/loop/lnm_loop_worker.c index 5e9d199..6d44406 100644 --- a/src/loop/lnm_loop_worker.c +++ b/src/loop/lnm_loop_worker.c @@ -87,10 +87,10 @@ void lnm_loop_worker_run(void *arg) { lnm_ldebug("loop", "worker %i processing fd %i", thread_id, conn->fd); switch (conn->state) { - case lnm_loop_state_req_blocking: + case lnm_loop_state_req_work: l->data_read(conn); break; - case lnm_loop_state_res_blocking: + case lnm_loop_state_res_work: l->data_write(conn); // Other states shouldn't even end up here, so we ignore them default:; From 1f63f06e0c733d069457a30ad3852c11811f1612 Mon Sep 17 00:00:00 2001 From: Chewing_Bever Date: Wed, 14 Feb 2024 12:43:58 +0100 Subject: [PATCH 7/7] fix: transparently support blocking work without worker threads --- example/blocking.c | 4 ++-- include/lnm/http/loop.h | 17 +++++----------- include/lnm/loop.h | 20 +++++++++++++++++++ src/_include/lnm/loop_internal.h | 2 +- src/http/lnm_http_loop.c | 18 +++++++---------- src/loop/lnm_loop.c | 2 +- src/loop/lnm_loop_io.c | 34 +++++++++++++++++++++++--------- src/loop/lnm_loop_worker.c | 11 +---------- 8 files changed, 62 insertions(+), 46 deletions(-) diff --git a/example/blocking.c b/example/blocking.c index 557bc87..a145ed5 100644 --- a/example/blocking.c +++ b/example/blocking.c @@ -27,12 +27,12 @@ int main() { ctx_reset, ctx_free); - lnm_http_step_init(&step, slow_step); + lnm_http_step_append(&step, slow_step, true); lnm_http_route_init_literal(&route, lnm_http_method_get, "/", step); lnm_http_loop_route_add(hl, route); lnm_log_init_global(); lnm_log_register_stdout(lnm_log_level_debug); - printf("res = %i\n", lnm_http_loop_run(hl, 8080, 1, 32)); + printf("res = %i\n", lnm_http_loop_run(hl, 8080, 1, 2)); } diff --git a/include/lnm/http/loop.h b/include/lnm/http/loop.h index 2427179..fefdf6a 100644 --- a/include/lnm/http/loop.h +++ b/include/lnm/http/loop.h @@ -32,23 +32,16 @@ lnm_err lnm_http_loop_init(lnm_http_loop **out, void *c_gctx, lnm_http_ctx_reset_fn ctx_reset, lnm_http_ctx_free_fn ctx_free); -/** - * Initialize a new step. - * - * @param out where to store pointer to new `lnm_http_step` - * @param fn step function - */ -lnm_err lnm_http_step_init(lnm_http_step **out, lnm_http_step_fn fn); - /** * Append the given step fn to the step. * - * @param out where to store pointer to new `lnm_http_step` - * @param step step to append new step to + * @param out both the previous step to append the new step to, and the output + * variable to which the new step is appended * @param fn step function + * @param blocking whether the step is blocking or not */ -lnm_err lnm_http_step_append(lnm_http_step **out, lnm_http_step *step, - lnm_http_step_fn fn); +lnm_err lnm_http_step_append(lnm_http_step **out, lnm_http_step_fn fn, + bool blocking); /** * Initialize a new route of type literal. diff --git a/include/lnm/loop.h b/include/lnm/loop.h index 09041bc..b922c1f 100644 --- a/include/lnm/loop.h +++ b/include/lnm/loop.h @@ -106,6 +106,26 @@ lnm_err lnm_loop_run(lnm_loop *l); lnm_err lnm_loop_run_multi(lnm_loop *l, size_t epoll_threads, size_t worker_threads); +/** + * Advance the processing of the given connection. + * + * Behavior of this function depends on both the connection state and whether + * worker threads are enabled. + * + * For IO states, this function will perform network I/O along with executing + * the loop's respective processing functions. + * + * For work states, the respective processing functions are executed without + * performing any network I/O. If no worker queue is present, this function + * performs all blocking work until an I/O or the end state is reached. If there + * is a worker queue present, only one block of work is done before exiting, + * allowing further blocks of work to be scheduled on other worker threads. + * + * If no worker queue is present, this function will only exit once an I/O or + * end state is reached. + */ +void lnm_loop_conn_advance(lnm_loop *l, lnm_loop_conn *conn); + /** * Reschedule the given connection, either on the event loop for network IO or * on a worker thread for blocking work. Connections are terminated as needed. diff --git a/src/_include/lnm/loop_internal.h b/src/_include/lnm/loop_internal.h index a5e70a8..c71f303 100644 --- a/src/_include/lnm/loop_internal.h +++ b/src/_include/lnm/loop_internal.h @@ -9,6 +9,6 @@ void lnm_loop_conn_free(lnm_loop *l, lnm_loop_conn *conn); lnm_err lnm_loop_accept(lnm_loop *l); -void lnm_loop_conn_io(lnm_loop *l, lnm_loop_conn *conn); +void lnm_loop_conn_advance(lnm_loop *l, lnm_loop_conn *conn); #endif diff --git a/src/http/lnm_http_loop.c b/src/http/lnm_http_loop.c index 153b88c..bd83526 100644 --- a/src/http/lnm_http_loop.c +++ b/src/http/lnm_http_loop.c @@ -28,7 +28,8 @@ lnm_err lnm_http_loop_init(lnm_http_loop **out, void *c_gctx, return lnm_err_ok; } -lnm_err lnm_http_step_init(lnm_http_step **out, lnm_http_step_fn fn) { +lnm_err lnm_http_step_append(lnm_http_step **out, lnm_http_step_fn fn, + bool blocking) { lnm_http_step *step = calloc(1, sizeof(lnm_http_step)); if (step == NULL) { @@ -36,19 +37,14 @@ lnm_err lnm_http_step_init(lnm_http_step **out, lnm_http_step_fn fn) { } step->fn = fn; - *out = step; + step->blocking = blocking; - return lnm_err_ok; -} - -lnm_err lnm_http_step_append(lnm_http_step **out, lnm_http_step *step, - lnm_http_step_fn fn) { - LNM_RES(lnm_http_step_init(out, fn)); - - if (step != NULL) { - step->next = *out; + if ((*out) != NULL) { + (*out)->next = step; } + *out = step; + return lnm_err_ok; } diff --git a/src/loop/lnm_loop.c b/src/loop/lnm_loop.c index de74902..5154b1e 100644 --- a/src/loop/lnm_loop.c +++ b/src/loop/lnm_loop.c @@ -196,7 +196,7 @@ lnm_err lnm_loop_run(lnm_loop *l) { lnm_loop_conn *conn = events[i].data.ptr; // At this point, state is always an IO state - lnm_loop_conn_io(l, conn); + lnm_loop_conn_advance(l, conn); lnm_loop_conn_schedule(l, conn); } } diff --git a/src/loop/lnm_loop_io.c b/src/loop/lnm_loop_io.c index 15fba24..58f5577 100644 --- a/src/loop/lnm_loop_io.c +++ b/src/loop/lnm_loop_io.c @@ -67,14 +67,30 @@ void lnm_loop_conn_io_res(lnm_loop *l, lnm_loop_conn *conn) { } while (conn->state == lnm_loop_state_res_io); } -void lnm_loop_conn_io(lnm_loop *l, lnm_loop_conn *conn) { - switch (conn->state) { - case lnm_loop_state_req_io: - lnm_loop_conn_io_req(l, conn); - break; - case lnm_loop_state_res_io: - lnm_loop_conn_io_res(l, conn); - break; - default:; +void lnm_loop_conn_advance(lnm_loop *l, lnm_loop_conn *conn) { + do { + + switch (conn->state) { + case lnm_loop_state_req_io: + lnm_loop_conn_io_req(l, conn); + break; + case lnm_loop_state_res_io: + lnm_loop_conn_io_res(l, conn); + break; + case lnm_loop_state_req_work: + do { + l->data_read(conn); + } while (conn->state == lnm_loop_state_req_work); + break; + case lnm_loop_state_res_work: + do { + l->data_write(conn); + } while (conn->state == lnm_loop_state_res_work); + break; + default:; + } } + // Execute all blocking work if we're running in single-threaded mode + while (l->wq == NULL && (conn->state == lnm_loop_state_req_work || + conn->state == lnm_loop_state_res_work)); } diff --git a/src/loop/lnm_loop_worker.c b/src/loop/lnm_loop_worker.c index 6d44406..fba4848 100644 --- a/src/loop/lnm_loop_worker.c +++ b/src/loop/lnm_loop_worker.c @@ -86,16 +86,7 @@ void lnm_loop_worker_run(void *arg) { lnm_loop_conn *conn = lnm_loop_queue_pop(q); lnm_ldebug("loop", "worker %i processing fd %i", thread_id, conn->fd); - switch (conn->state) { - case lnm_loop_state_req_work: - l->data_read(conn); - break; - case lnm_loop_state_res_work: - l->data_write(conn); - // Other states shouldn't even end up here, so we ignore them - default:; - } - + lnm_loop_conn_advance(l, conn); lnm_loop_conn_schedule(l, conn); } }