#ifndef LNM_LOOP #define LNM_LOOP #include #include #include #include #include "lnm/common.h" #define LNM_LOOP_BUF_SIZE 2048 #define LNM_QUEUE_MULTIPLIER 8 typedef enum lnm_loop_state { lnm_loop_state_req_io = 0, lnm_loop_state_res_io, lnm_loop_state_end, lnm_loop_state_req_work, lnm_loop_state_res_work, } lnm_loop_state; /** * State for a currently active connection */ typedef struct lnm_loop_conn { int fd; lnm_loop_state state; void *ctx; struct { char buf[LNM_LOOP_BUF_SIZE]; size_t size; size_t read; } r; struct { char buf[LNM_LOOP_BUF_SIZE]; size_t size; } w; } lnm_loop_conn; /** * Concurrent fixed-size queue used to distribute work among worker threads */ typedef struct lnm_loop_queue { struct { lnm_loop_conn **arr; size_t len; } buf; size_t head; size_t tail; bool empty; pthread_mutex_t mutex; pthread_cond_t cond; } lnm_loop_queue; /** * Initialize a new queue with the specified fixed capacity */ lnm_err lnm_loop_queue_init(lnm_loop_queue **out, size_t cap); /** * Queue the given connection. If the queue is currently full, this action * blocks until there is space available. */ void lnm_loop_queue_push(lnm_loop_queue *q, lnm_loop_conn *conn); /** * Pop a connection from the queue. This action blocks until a connection is * available. */ lnm_loop_conn *lnm_loop_queue_pop(lnm_loop_queue *q); typedef struct lnm_loop { int listen_fd; int epoll_fd; atomic_int open; void *gctx; lnm_err (*ctx_init)(void **out, void *gctx); void (*ctx_free)(void *ctx); void (*data_read)(lnm_loop_conn *conn); void (*data_write)(lnm_loop_conn *conn); lnm_loop_queue *wq; struct { // Mutex shared between all threads; used for counting thread IDs pthread_mutex_t mutex; size_t worker_count; size_t epoll_count; } threads; } lnm_loop; lnm_err lnm_loop_init(lnm_loop **out, void *gctx, lnm_err (*ctx_init)(void **out, void *gctx), void (*ctx_free)(void *ctx), void (*data_read)(lnm_loop_conn *conn), void (*data_write)(lnm_loop_conn *conn)); lnm_err lnm_loop_setup(lnm_loop *l, uint16_t port); /** * Run a single epoll thread of the event loop. */ lnm_err lnm_loop_run(lnm_loop *l); /** * Run a multithreaded event loop with the configured number of threads. */ lnm_err lnm_loop_run_multi(lnm_loop *l, size_t epoll_threads, size_t worker_threads); /** * Advance the processing of the given connection. * * Behavior of this function depends on both the connection state and whether * worker threads are enabled. * * For IO states, this function will perform network I/O along with executing * the loop's respective processing functions. * * For work states, the respective processing functions are executed without * performing any network I/O. If no worker queue is present, this function * performs all blocking work until an I/O or the end state is reached. If there * is a worker queue present, only one block of work is done before exiting, * allowing further blocks of work to be scheduled on other worker threads. * * If no worker queue is present, this function will only exit once an I/O or * end state is reached. */ void lnm_loop_conn_advance(lnm_loop *l, lnm_loop_conn *conn); /** * Reschedule the given connection, either on the event loop for network IO or * on a worker thread for blocking work. Connections are terminated as needed. */ void lnm_loop_conn_schedule(lnm_loop *l, lnm_loop_conn *conn); /** * Main loop executed on the worker threads. */ void lnm_loop_worker_run(void *arg); #endif