feat: change lnm loop multithreading api

blocking
Jef Roosens 2024-02-01 15:37:35 +01:00
parent ccb9b77cc8
commit 9c01548256
Signed by: Jef Roosens
GPG Key ID: B75D4F293C7052DB
5 changed files with 52 additions and 36 deletions

View File

@ -34,5 +34,5 @@ int main() {
lnm_log_init_global(); lnm_log_init_global();
lnm_log_register_stdout(lnm_log_level_debug); lnm_log_register_stdout(lnm_log_level_debug);
lnm_http_loop_run(hl, 8080, 1); printf("res = %i\n", lnm_http_loop_run(hl, 8080, 1, 1));
} }

View File

@ -81,7 +81,8 @@ lnm_err lnm_http_route_init_regex(lnm_http_route **out, lnm_http_method method,
*/ */
lnm_err lnm_http_loop_route_add(lnm_http_loop *hl, lnm_http_route *route); lnm_err lnm_http_loop_route_add(lnm_http_loop *hl, lnm_http_route *route);
lnm_err lnm_http_loop_run(lnm_http_loop *hl, uint16_t port, int thread_count); lnm_err lnm_http_loop_run(lnm_http_loop *hl, uint16_t port,
size_t epoll_threads, size_t worker_threads);
void lnm_http_loop_set_api_key(lnm_http_loop *hl, const char *api_key); void lnm_http_loop_set_api_key(lnm_http_loop *hl, const char *api_key);

View File

@ -9,6 +9,7 @@
#include "lnm/common.h" #include "lnm/common.h"
#define LNM_LOOP_BUF_SIZE 2048 #define LNM_LOOP_BUF_SIZE 2048
#define LNM_QUEUE_MULTIPLIER 8
typedef enum lnm_loop_state { typedef enum lnm_loop_state {
lnm_loop_state_req = 0, lnm_loop_state_req = 0,
@ -77,6 +78,12 @@ typedef struct lnm_loop {
void (*data_read)(lnm_loop_conn *conn); void (*data_read)(lnm_loop_conn *conn);
void (*data_write)(lnm_loop_conn *conn); void (*data_write)(lnm_loop_conn *conn);
lnm_loop_queue *wq; lnm_loop_queue *wq;
struct {
// Mutex shared between all threads; used for counting thread IDs
pthread_mutex_t mutex;
size_t worker_count;
size_t epoll_count;
} threads;
} lnm_loop; } lnm_loop;
lnm_err lnm_loop_init(lnm_loop **out, void *gctx, lnm_err lnm_loop_init(lnm_loop **out, void *gctx,
@ -87,7 +94,16 @@ lnm_err lnm_loop_init(lnm_loop **out, void *gctx,
lnm_err lnm_loop_setup(lnm_loop *l, uint16_t port); lnm_err lnm_loop_setup(lnm_loop *l, uint16_t port);
lnm_err lnm_loop_run(lnm_loop *l, int thread_count); /**
* Run a single epoll thread of the event loop.
*/
lnm_err lnm_loop_run(lnm_loop *l);
/**
* Run a multithreaded event loop with the configured number of threads.
*/
lnm_err lnm_loop_run_multi(lnm_loop *l, size_t epoll_threads,
size_t worker_threads);
/** /**
* Reschedule the given connection, either on the event loop for network IO or * Reschedule the given connection, either on the event loop for network IO or

View File

@ -122,9 +122,10 @@ lnm_err lnm_http_loop_route_add(lnm_http_loop *hl, lnm_http_route *route) {
return lnm_err_ok; return lnm_err_ok;
} }
lnm_err lnm_http_loop_run(lnm_http_loop *hl, uint16_t port, int thread_count) { lnm_err lnm_http_loop_run(lnm_http_loop *hl, uint16_t port,
size_t epoll_threads, size_t worker_threads) {
LNM_RES(lnm_loop_setup(hl, port)); LNM_RES(lnm_loop_setup(hl, port));
return lnm_loop_run(hl, thread_count); return lnm_loop_run_multi(hl, epoll_threads, worker_threads);
} }
void lnm_http_loop_set_api_key(lnm_http_loop *hl, const char *api_key) { void lnm_http_loop_set_api_key(lnm_http_loop *hl, const char *api_key) {

View File

@ -30,6 +30,8 @@ lnm_err lnm_loop_init(lnm_loop **out, void *gctx,
l->data_read = data_read; l->data_read = data_read;
l->data_write = data_write; l->data_write = data_write;
pthread_mutex_init(&l->threads.mutex, NULL);
*out = l; *out = l;
return lnm_err_ok; return lnm_err_ok;
@ -157,19 +159,21 @@ void lnm_loop_conn_schedule(lnm_loop *l, lnm_loop_conn *conn) {
} }
} }
typedef struct lnm_loop_thread_args { lnm_err lnm_loop_run(lnm_loop *l) {
lnm_loop *l; if (l->epoll_fd == 0) {
int id; return lnm_err_not_setup;
int thread_count; }
} lnm_loop_thread_args;
lnm_err lnm_loop_run_thread(lnm_loop_thread_args *args) { // Get thread ID by incrementing counter
lnm_loop *l = args->l; pthread_mutex_lock(&l->threads.mutex);
int thread_id = args->id;
int thread_count = args->thread_count; int thread_id = l->threads.epoll_count;
l->threads.epoll_count++;
pthread_mutex_unlock(&l->threads.mutex);
struct epoll_event *events = calloc(1, sizeof(struct epoll_event)); struct epoll_event *events = calloc(1, sizeof(struct epoll_event));
int events_cap = 1; size_t events_cap = 1;
if (events == NULL) { if (events == NULL) {
return lnm_err_failed_alloc; return lnm_err_failed_alloc;
@ -197,9 +201,10 @@ lnm_err lnm_loop_run_thread(lnm_loop_thread_args *args) {
} }
} }
int open = l->open; size_t open = l->open;
int cap_per_thread = size_t cap_per_thread = open + 1 > l->threads.epoll_count
open + 1 > thread_count ? (open + 1) / thread_count : 1; ? (open + 1) / l->threads.epoll_count
: 1;
if (cap_per_thread > events_cap) { if (cap_per_thread > events_cap) {
struct epoll_event *new_events = struct epoll_event *new_events =
@ -218,28 +223,21 @@ lnm_err lnm_loop_run_thread(lnm_loop_thread_args *args) {
return lnm_err_ok; return lnm_err_ok;
} }
lnm_err lnm_loop_run(lnm_loop *l, int thread_count) { lnm_err lnm_loop_run_multi(lnm_loop *l, size_t epoll_threads,
if (l->epoll_fd == 0) { size_t worker_threads) {
return lnm_err_not_setup; if (worker_threads > 0) {
LNM_RES(lnm_loop_queue_init(&l->wq, LNM_QUEUE_MULTIPLIER * worker_threads));
} }
lnm_loop_thread_args args[thread_count]; pthread_t t;
for (int i = 1; i < thread_count; i++) { for (size_t i = 1; i < epoll_threads; i++) {
args[i].l = l; pthread_create(&t, NULL, (void *(*)(void *))lnm_loop_run, l);
args[i].id = i;
args[i].thread_count = thread_count;
pthread_t thread;
pthread_create(&thread, NULL, (void *(*)(void *))lnm_loop_run_thread,
&args[i]);
} }
args[0].l = l; for (size_t i = 0; i < worker_threads; i++) {
args[0].id = 0; pthread_create(&t, NULL, (void *(*)(void *))lnm_loop_worker_run, l);
args[0].thread_count = thread_count; }
lnm_loop_run_thread(&args[0]); return lnm_loop_run(l);
return lnm_err_ok;
} }