141 lines
3.6 KiB
C
141 lines
3.6 KiB
C
#ifndef LNM_LOOP
|
|
#define LNM_LOOP
|
|
|
|
#include <pthread.h>
|
|
#include <stdatomic.h>
|
|
#include <stdint.h>
|
|
#include <stdlib.h>
|
|
|
|
#include "lnm/common.h"
|
|
|
|
#define LNM_LOOP_BUF_SIZE 2048
|
|
#define LNM_QUEUE_MULTIPLIER 8
|
|
|
|
typedef enum lnm_loop_state {
|
|
lnm_loop_state_req_io = 0,
|
|
lnm_loop_state_res_io,
|
|
lnm_loop_state_end,
|
|
lnm_loop_state_req_work,
|
|
lnm_loop_state_res_work,
|
|
} lnm_loop_state;
|
|
|
|
/**
|
|
* State for a currently active connection
|
|
*/
|
|
typedef struct lnm_loop_conn {
|
|
int fd;
|
|
lnm_loop_state state;
|
|
void *ctx;
|
|
struct {
|
|
char buf[LNM_LOOP_BUF_SIZE];
|
|
size_t size;
|
|
size_t read;
|
|
} r;
|
|
struct {
|
|
char buf[LNM_LOOP_BUF_SIZE];
|
|
size_t size;
|
|
} w;
|
|
} lnm_loop_conn;
|
|
|
|
/**
|
|
* Concurrent fixed-size queue used to distribute work among worker threads
|
|
*/
|
|
typedef struct lnm_loop_queue {
|
|
struct {
|
|
lnm_loop_conn **arr;
|
|
size_t len;
|
|
} buf;
|
|
size_t head;
|
|
size_t tail;
|
|
bool empty;
|
|
pthread_mutex_t mutex;
|
|
pthread_cond_t cond;
|
|
} lnm_loop_queue;
|
|
|
|
/**
|
|
* Initialize a new queue with the specified fixed capacity
|
|
*/
|
|
lnm_err lnm_loop_queue_init(lnm_loop_queue **out, size_t cap);
|
|
|
|
/**
|
|
* Queue the given connection. If the queue is currently full, this action
|
|
* blocks until there is space available.
|
|
*/
|
|
void lnm_loop_queue_push(lnm_loop_queue *q, lnm_loop_conn *conn);
|
|
|
|
/**
|
|
* Pop a connection from the queue. This action blocks until a connection is
|
|
* available.
|
|
*/
|
|
lnm_loop_conn *lnm_loop_queue_pop(lnm_loop_queue *q);
|
|
|
|
typedef struct lnm_loop {
|
|
int listen_fd;
|
|
int epoll_fd;
|
|
atomic_int open;
|
|
void *gctx;
|
|
lnm_err (*ctx_init)(void **out, void *gctx);
|
|
void (*ctx_free)(void *ctx);
|
|
void (*data_read)(lnm_loop_conn *conn);
|
|
void (*data_write)(lnm_loop_conn *conn);
|
|
lnm_loop_queue *wq;
|
|
struct {
|
|
// Mutex shared between all threads; used for counting thread IDs
|
|
pthread_mutex_t mutex;
|
|
size_t worker_count;
|
|
size_t epoll_count;
|
|
} threads;
|
|
} lnm_loop;
|
|
|
|
lnm_err lnm_loop_init(lnm_loop **out, void *gctx,
|
|
lnm_err (*ctx_init)(void **out, void *gctx),
|
|
void (*ctx_free)(void *ctx),
|
|
void (*data_read)(lnm_loop_conn *conn),
|
|
void (*data_write)(lnm_loop_conn *conn));
|
|
|
|
lnm_err lnm_loop_setup(lnm_loop *l, uint16_t port);
|
|
|
|
/**
|
|
* Run a single epoll thread of the event loop.
|
|
*/
|
|
lnm_err lnm_loop_run(lnm_loop *l);
|
|
|
|
/**
|
|
* Run a multithreaded event loop with the configured number of threads.
|
|
*/
|
|
lnm_err lnm_loop_run_multi(lnm_loop *l, size_t epoll_threads,
|
|
size_t worker_threads);
|
|
|
|
/**
|
|
* Advance the processing of the given connection.
|
|
*
|
|
* Behavior of this function depends on both the connection state and whether
|
|
* worker threads are enabled.
|
|
*
|
|
* For IO states, this function will perform network I/O along with executing
|
|
* the loop's respective processing functions.
|
|
*
|
|
* For work states, the respective processing functions are executed without
|
|
* performing any network I/O. If no worker queue is present, this function
|
|
* performs all blocking work until an I/O or the end state is reached. If there
|
|
* is a worker queue present, only one block of work is done before exiting,
|
|
* allowing further blocks of work to be scheduled on other worker threads.
|
|
*
|
|
* If no worker queue is present, this function will only exit once an I/O or
|
|
* end state is reached.
|
|
*/
|
|
void lnm_loop_conn_advance(lnm_loop *l, lnm_loop_conn *conn);
|
|
|
|
/**
|
|
* Reschedule the given connection, either on the event loop for network IO or
|
|
* on a worker thread for blocking work. Connections are terminated as needed.
|
|
*/
|
|
void lnm_loop_conn_schedule(lnm_loop *l, lnm_loop_conn *conn);
|
|
|
|
/**
|
|
* Main loop executed on the worker threads.
|
|
*/
|
|
void lnm_loop_worker_run(void *arg);
|
|
|
|
#endif
|