feat: further lay groundwork for worker threads

blocking
Jef Roosens 2024-02-01 14:00:03 +01:00
parent 3490c2dd42
commit ccb9b77cc8
Signed by: Jef Roosens
GPG Key ID: B75D4F293C7052DB
3 changed files with 100 additions and 47 deletions

View File

@ -10,12 +10,17 @@
#define LNM_LOOP_BUF_SIZE 2048
typedef enum {
typedef enum lnm_loop_state {
lnm_loop_state_req = 0,
lnm_loop_state_res,
lnm_loop_state_end,
lnm_loop_state_req_blocking,
lnm_loop_state_res_blocking,
} lnm_loop_state;
/**
* State for a currently active connection
*/
typedef struct lnm_loop_conn {
int fd;
lnm_loop_state state;
@ -31,29 +36,8 @@ typedef struct lnm_loop_conn {
} w;
} lnm_loop_conn;
typedef struct lnm_loop {
int listen_fd;
int epoll_fd;
atomic_int open;
void *gctx;
lnm_err (*ctx_init)(void **out, void *gctx);
void (*ctx_free)(void *ctx);
void (*data_read)(lnm_loop_conn *conn);
void (*data_write)(lnm_loop_conn *conn);
} lnm_loop;
lnm_err lnm_loop_init(lnm_loop **out, void *gctx,
lnm_err (*ctx_init)(void **out, void *gctx),
void (*ctx_free)(void *ctx),
void (*data_read)(lnm_loop_conn *conn),
void (*data_write)(lnm_loop_conn *conn));
lnm_err lnm_loop_setup(lnm_loop *l, uint16_t port);
lnm_err lnm_loop_run(lnm_loop *l, int thread_count);
/**
* Concurrent fixed-size queue used by the worked threads to distribute tasks
* Concurrent fixed-size queue used to distribute work among worker threads
*/
typedef struct lnm_loop_queue {
struct {
@ -83,4 +67,37 @@ void lnm_loop_queue_push(lnm_loop_queue *q, lnm_loop_conn *conn);
*/
lnm_loop_conn *lnm_loop_queue_pop(lnm_loop_queue *q);
typedef struct lnm_loop {
int listen_fd;
int epoll_fd;
atomic_int open;
void *gctx;
lnm_err (*ctx_init)(void **out, void *gctx);
void (*ctx_free)(void *ctx);
void (*data_read)(lnm_loop_conn *conn);
void (*data_write)(lnm_loop_conn *conn);
lnm_loop_queue *wq;
} lnm_loop;
lnm_err lnm_loop_init(lnm_loop **out, void *gctx,
lnm_err (*ctx_init)(void **out, void *gctx),
void (*ctx_free)(void *ctx),
void (*data_read)(lnm_loop_conn *conn),
void (*data_write)(lnm_loop_conn *conn));
lnm_err lnm_loop_setup(lnm_loop *l, uint16_t port);
lnm_err lnm_loop_run(lnm_loop *l, int thread_count);
/**
* Reschedule the given connection, either on the event loop for network IO or
* on a worker thread for blocking work. Connections are terminated as needed.
*/
void lnm_loop_conn_schedule(lnm_loop *l, lnm_loop_conn *conn);
/**
* Main loop executed on the worker threads.
*/
void lnm_loop_worker_run(void *arg);
#endif

View File

@ -62,6 +62,10 @@ lnm_err lnm_loop_accept(lnm_loop *l) {
l->open++;
// Make sure to re-arm the listening socket after accepting
event.data.ptr = NULL;
epoll_ctl(l->epoll_fd, EPOLL_CTL_MOD, l->listen_fd, &event);
lnm_ldebug(section, "connection opened with fd %i", conn_fd);
return lnm_err_ok;
@ -123,6 +127,36 @@ lnm_err lnm_loop_setup(lnm_loop *l, uint16_t port) {
return lnm_err_ok;
}
void lnm_loop_conn_schedule(lnm_loop *l, lnm_loop_conn *conn) {
switch (conn->state) {
// IO states get rescheduled in the epoll loop
case lnm_loop_state_req:
case lnm_loop_state_res: {
struct epoll_event event = {
.data.ptr = conn,
.events = (conn->state == lnm_loop_state_req ? EPOLLIN : EPOLLOUT) |
EPOLLET | EPOLLONESHOT};
epoll_ctl(l->epoll_fd, EPOLL_CTL_MOD, conn->fd, &event);
} break;
case lnm_loop_state_req_blocking:
case lnm_loop_state_res_blocking:
lnm_loop_queue_push(l->wq, conn);
break;
case lnm_loop_state_end: {
int conn_fd = conn->fd;
lnm_loop_conn_free(l, conn);
close(conn_fd);
l->open--;
epoll_ctl(l->epoll_fd, EPOLL_CTL_DEL, conn_fd, NULL);
lnm_ldebug(section, "connection closed with fd %i", conn_fd);
} break;
}
}
typedef struct lnm_loop_thread_args {
lnm_loop *l;
int id;
@ -143,9 +177,6 @@ lnm_err lnm_loop_run_thread(lnm_loop_thread_args *args) {
lnm_linfo(section, "thread %i started", thread_id);
struct epoll_event listen_event = {
.data.ptr = NULL, .events = EPOLLIN | EPOLLET | EPOLLONESHOT};
while (1) {
int polled = epoll_wait(l->epoll_fd, events, events_cap, -1);
lnm_ldebug(section, "polled (thread %i): %i", thread_id, polled);
@ -157,30 +188,12 @@ lnm_err lnm_loop_run_thread(lnm_loop_thread_args *args) {
for (int i = 0; i < polled; i++) {
if (events[i].data.ptr == NULL) {
lnm_loop_accept(l);
epoll_ctl(l->epoll_fd, EPOLL_CTL_MOD, l->listen_fd, &listen_event);
} else {
lnm_loop_conn *conn = events[i].data.ptr;
// At this point, state is always an IO state
lnm_loop_conn_io(l, conn);
if (conn->state == lnm_loop_state_end) {
int conn_fd = conn->fd;
lnm_loop_conn_free(l, conn);
close(conn_fd);
l->open--;
epoll_ctl(l->epoll_fd, EPOLL_CTL_DEL, conn_fd, NULL);
lnm_ldebug(section, "connection closed with fd %i", conn_fd);
} else {
struct epoll_event event = {
.data.ptr = conn,
.events =
(conn->state == lnm_loop_state_req ? EPOLLIN : EPOLLOUT) |
EPOLLET | EPOLLONESHOT};
epoll_ctl(l->epoll_fd, EPOLL_CTL_MOD, conn->fd, &event);
}
lnm_loop_conn_schedule(l, conn);
}
}

View File

@ -1,3 +1,5 @@
#include <sys/epoll.h>
#include "lnm/loop.h"
lnm_err lnm_loop_queue_init(lnm_loop_queue **out, size_t cap) {
@ -59,3 +61,24 @@ lnm_loop_conn *lnm_loop_queue_pop(lnm_loop_queue *q) {
return out;
}
void lnm_loop_worker_run(void *arg) {
lnm_loop *l = arg;
lnm_loop_queue *q = l->wq;
while (1) {
lnm_loop_conn *conn = lnm_loop_queue_pop(q);
switch (conn->state) {
case lnm_loop_state_req_blocking:
l->data_read(conn);
break;
case lnm_loop_state_res_blocking:
l->data_write(conn);
// Other states shouldn't even end up here, so we ignore them
default:;
}
lnm_loop_conn_schedule(l, conn);
}
}