diff --git a/example/blocking.c b/example/blocking.c index 9b395b4..5d58451 100644 --- a/example/blocking.c +++ b/example/blocking.c @@ -34,5 +34,5 @@ int main() { lnm_log_init_global(); lnm_log_register_stdout(lnm_log_level_debug); - lnm_http_loop_run(hl, 8080, 1); + printf("res = %i\n", lnm_http_loop_run(hl, 8080, 1, 1)); } diff --git a/include/lnm/http/loop.h b/include/lnm/http/loop.h index 8e36caa..2427179 100644 --- a/include/lnm/http/loop.h +++ b/include/lnm/http/loop.h @@ -81,7 +81,8 @@ lnm_err lnm_http_route_init_regex(lnm_http_route **out, lnm_http_method method, */ lnm_err lnm_http_loop_route_add(lnm_http_loop *hl, lnm_http_route *route); -lnm_err lnm_http_loop_run(lnm_http_loop *hl, uint16_t port, int thread_count); +lnm_err lnm_http_loop_run(lnm_http_loop *hl, uint16_t port, + size_t epoll_threads, size_t worker_threads); void lnm_http_loop_set_api_key(lnm_http_loop *hl, const char *api_key); diff --git a/include/lnm/loop.h b/include/lnm/loop.h index 4f98349..4570ca1 100644 --- a/include/lnm/loop.h +++ b/include/lnm/loop.h @@ -9,6 +9,7 @@ #include "lnm/common.h" #define LNM_LOOP_BUF_SIZE 2048 +#define LNM_QUEUE_MULTIPLIER 8 typedef enum lnm_loop_state { lnm_loop_state_req = 0, @@ -77,6 +78,12 @@ typedef struct lnm_loop { void (*data_read)(lnm_loop_conn *conn); void (*data_write)(lnm_loop_conn *conn); lnm_loop_queue *wq; + struct { + // Mutex shared between all threads; used for counting thread IDs + pthread_mutex_t mutex; + size_t worker_count; + size_t epoll_count; + } threads; } lnm_loop; lnm_err lnm_loop_init(lnm_loop **out, void *gctx, @@ -87,7 +94,16 @@ lnm_err lnm_loop_init(lnm_loop **out, void *gctx, lnm_err lnm_loop_setup(lnm_loop *l, uint16_t port); -lnm_err lnm_loop_run(lnm_loop *l, int thread_count); +/** + * Run a single epoll thread of the event loop. + */ +lnm_err lnm_loop_run(lnm_loop *l); + +/** + * Run a multithreaded event loop with the configured number of threads. + */ +lnm_err lnm_loop_run_multi(lnm_loop *l, size_t epoll_threads, + size_t worker_threads); /** * Reschedule the given connection, either on the event loop for network IO or diff --git a/src/http/lnm_http_loop.c b/src/http/lnm_http_loop.c index 784126e..153b88c 100644 --- a/src/http/lnm_http_loop.c +++ b/src/http/lnm_http_loop.c @@ -122,9 +122,10 @@ lnm_err lnm_http_loop_route_add(lnm_http_loop *hl, lnm_http_route *route) { return lnm_err_ok; } -lnm_err lnm_http_loop_run(lnm_http_loop *hl, uint16_t port, int thread_count) { +lnm_err lnm_http_loop_run(lnm_http_loop *hl, uint16_t port, + size_t epoll_threads, size_t worker_threads) { LNM_RES(lnm_loop_setup(hl, port)); - return lnm_loop_run(hl, thread_count); + return lnm_loop_run_multi(hl, epoll_threads, worker_threads); } void lnm_http_loop_set_api_key(lnm_http_loop *hl, const char *api_key) { diff --git a/src/loop/lnm_loop.c b/src/loop/lnm_loop.c index 5379ecc..ac6e5eb 100644 --- a/src/loop/lnm_loop.c +++ b/src/loop/lnm_loop.c @@ -30,6 +30,8 @@ lnm_err lnm_loop_init(lnm_loop **out, void *gctx, l->data_read = data_read; l->data_write = data_write; + pthread_mutex_init(&l->threads.mutex, NULL); + *out = l; return lnm_err_ok; @@ -157,19 +159,21 @@ void lnm_loop_conn_schedule(lnm_loop *l, lnm_loop_conn *conn) { } } -typedef struct lnm_loop_thread_args { - lnm_loop *l; - int id; - int thread_count; -} lnm_loop_thread_args; +lnm_err lnm_loop_run(lnm_loop *l) { + if (l->epoll_fd == 0) { + return lnm_err_not_setup; + } -lnm_err lnm_loop_run_thread(lnm_loop_thread_args *args) { - lnm_loop *l = args->l; - int thread_id = args->id; - int thread_count = args->thread_count; + // Get thread ID by incrementing counter + pthread_mutex_lock(&l->threads.mutex); + + int thread_id = l->threads.epoll_count; + l->threads.epoll_count++; + + pthread_mutex_unlock(&l->threads.mutex); struct epoll_event *events = calloc(1, sizeof(struct epoll_event)); - int events_cap = 1; + size_t events_cap = 1; if (events == NULL) { return lnm_err_failed_alloc; @@ -197,9 +201,10 @@ lnm_err lnm_loop_run_thread(lnm_loop_thread_args *args) { } } - int open = l->open; - int cap_per_thread = - open + 1 > thread_count ? (open + 1) / thread_count : 1; + size_t open = l->open; + size_t cap_per_thread = open + 1 > l->threads.epoll_count + ? (open + 1) / l->threads.epoll_count + : 1; if (cap_per_thread > events_cap) { struct epoll_event *new_events = @@ -218,28 +223,21 @@ lnm_err lnm_loop_run_thread(lnm_loop_thread_args *args) { return lnm_err_ok; } -lnm_err lnm_loop_run(lnm_loop *l, int thread_count) { - if (l->epoll_fd == 0) { - return lnm_err_not_setup; +lnm_err lnm_loop_run_multi(lnm_loop *l, size_t epoll_threads, + size_t worker_threads) { + if (worker_threads > 0) { + LNM_RES(lnm_loop_queue_init(&l->wq, LNM_QUEUE_MULTIPLIER * worker_threads)); } - lnm_loop_thread_args args[thread_count]; + pthread_t t; - for (int i = 1; i < thread_count; i++) { - args[i].l = l; - args[i].id = i; - args[i].thread_count = thread_count; - - pthread_t thread; - pthread_create(&thread, NULL, (void *(*)(void *))lnm_loop_run_thread, - &args[i]); + for (size_t i = 1; i < epoll_threads; i++) { + pthread_create(&t, NULL, (void *(*)(void *))lnm_loop_run, l); } - args[0].l = l; - args[0].id = 0; - args[0].thread_count = thread_count; + for (size_t i = 0; i < worker_threads; i++) { + pthread_create(&t, NULL, (void *(*)(void *))lnm_loop_worker_run, l); + } - lnm_loop_run_thread(&args[0]); - - return lnm_err_ok; + return lnm_loop_run(l); }