Compare commits
No commits in common. "1f63f06e0c733d069457a30ad3852c11811f1612" and "854e8c3809bf97a3c395525d15b8a3fe00e89005" have entirely different histories.
1f63f06e0c
...
854e8c3809
|
@ -27,12 +27,12 @@ int main() {
|
||||||
ctx_reset,
|
ctx_reset,
|
||||||
ctx_free);
|
ctx_free);
|
||||||
|
|
||||||
lnm_http_step_append(&step, slow_step, true);
|
lnm_http_step_init(&step, slow_step);
|
||||||
lnm_http_route_init_literal(&route, lnm_http_method_get, "/", step);
|
lnm_http_route_init_literal(&route, lnm_http_method_get, "/", step);
|
||||||
lnm_http_loop_route_add(hl, route);
|
lnm_http_loop_route_add(hl, route);
|
||||||
|
|
||||||
lnm_log_init_global();
|
lnm_log_init_global();
|
||||||
lnm_log_register_stdout(lnm_log_level_debug);
|
lnm_log_register_stdout(lnm_log_level_debug);
|
||||||
|
|
||||||
printf("res = %i\n", lnm_http_loop_run(hl, 8080, 1, 2));
|
lnm_http_loop_run(hl, 8080, 1);
|
||||||
}
|
}
|
||||||
|
|
|
@ -32,16 +32,23 @@ lnm_err lnm_http_loop_init(lnm_http_loop **out, void *c_gctx,
|
||||||
lnm_http_ctx_reset_fn ctx_reset,
|
lnm_http_ctx_reset_fn ctx_reset,
|
||||||
lnm_http_ctx_free_fn ctx_free);
|
lnm_http_ctx_free_fn ctx_free);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initialize a new step.
|
||||||
|
*
|
||||||
|
* @param out where to store pointer to new `lnm_http_step`
|
||||||
|
* @param fn step function
|
||||||
|
*/
|
||||||
|
lnm_err lnm_http_step_init(lnm_http_step **out, lnm_http_step_fn fn);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Append the given step fn to the step.
|
* Append the given step fn to the step.
|
||||||
*
|
*
|
||||||
* @param out both the previous step to append the new step to, and the output
|
* @param out where to store pointer to new `lnm_http_step`
|
||||||
* variable to which the new step is appended
|
* @param step step to append new step to
|
||||||
* @param fn step function
|
* @param fn step function
|
||||||
* @param blocking whether the step is blocking or not
|
|
||||||
*/
|
*/
|
||||||
lnm_err lnm_http_step_append(lnm_http_step **out, lnm_http_step_fn fn,
|
lnm_err lnm_http_step_append(lnm_http_step **out, lnm_http_step *step,
|
||||||
bool blocking);
|
lnm_http_step_fn fn);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Initialize a new route of type literal.
|
* Initialize a new route of type literal.
|
||||||
|
@ -74,8 +81,7 @@ lnm_err lnm_http_route_init_regex(lnm_http_route **out, lnm_http_method method,
|
||||||
*/
|
*/
|
||||||
lnm_err lnm_http_loop_route_add(lnm_http_loop *hl, lnm_http_route *route);
|
lnm_err lnm_http_loop_route_add(lnm_http_loop *hl, lnm_http_route *route);
|
||||||
|
|
||||||
lnm_err lnm_http_loop_run(lnm_http_loop *hl, uint16_t port,
|
lnm_err lnm_http_loop_run(lnm_http_loop *hl, uint16_t port, int thread_count);
|
||||||
size_t epoll_threads, size_t worker_threads);
|
|
||||||
|
|
||||||
void lnm_http_loop_set_api_key(lnm_http_loop *hl, const char *api_key);
|
void lnm_http_loop_set_api_key(lnm_http_loop *hl, const char *api_key);
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
#ifndef LNM_LOOP
|
#ifndef LNM_LOOP
|
||||||
#define LNM_LOOP
|
#define LNM_LOOP
|
||||||
|
|
||||||
#include <pthread.h>
|
|
||||||
#include <stdatomic.h>
|
#include <stdatomic.h>
|
||||||
#include <stdint.h>
|
#include <stdint.h>
|
||||||
#include <stdlib.h>
|
#include <stdlib.h>
|
||||||
|
@ -9,19 +8,13 @@
|
||||||
#include "lnm/common.h"
|
#include "lnm/common.h"
|
||||||
|
|
||||||
#define LNM_LOOP_BUF_SIZE 2048
|
#define LNM_LOOP_BUF_SIZE 2048
|
||||||
#define LNM_QUEUE_MULTIPLIER 8
|
|
||||||
|
|
||||||
typedef enum lnm_loop_state {
|
typedef enum {
|
||||||
lnm_loop_state_req_io = 0,
|
lnm_loop_state_req = 0,
|
||||||
lnm_loop_state_res_io,
|
lnm_loop_state_res,
|
||||||
lnm_loop_state_end,
|
lnm_loop_state_end,
|
||||||
lnm_loop_state_req_work,
|
|
||||||
lnm_loop_state_res_work,
|
|
||||||
} lnm_loop_state;
|
} lnm_loop_state;
|
||||||
|
|
||||||
/**
|
|
||||||
* State for a currently active connection
|
|
||||||
*/
|
|
||||||
typedef struct lnm_loop_conn {
|
typedef struct lnm_loop_conn {
|
||||||
int fd;
|
int fd;
|
||||||
lnm_loop_state state;
|
lnm_loop_state state;
|
||||||
|
@ -37,38 +30,6 @@ typedef struct lnm_loop_conn {
|
||||||
} w;
|
} w;
|
||||||
} lnm_loop_conn;
|
} lnm_loop_conn;
|
||||||
|
|
||||||
/**
|
|
||||||
* Concurrent fixed-size queue used to distribute work among worker threads
|
|
||||||
*/
|
|
||||||
typedef struct lnm_loop_queue {
|
|
||||||
struct {
|
|
||||||
lnm_loop_conn **arr;
|
|
||||||
size_t len;
|
|
||||||
} buf;
|
|
||||||
size_t head;
|
|
||||||
size_t tail;
|
|
||||||
bool empty;
|
|
||||||
pthread_mutex_t mutex;
|
|
||||||
pthread_cond_t cond;
|
|
||||||
} lnm_loop_queue;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Initialize a new queue with the specified fixed capacity
|
|
||||||
*/
|
|
||||||
lnm_err lnm_loop_queue_init(lnm_loop_queue **out, size_t cap);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Queue the given connection. If the queue is currently full, this action
|
|
||||||
* blocks until there is space available.
|
|
||||||
*/
|
|
||||||
void lnm_loop_queue_push(lnm_loop_queue *q, lnm_loop_conn *conn);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Pop a connection from the queue. This action blocks until a connection is
|
|
||||||
* available.
|
|
||||||
*/
|
|
||||||
lnm_loop_conn *lnm_loop_queue_pop(lnm_loop_queue *q);
|
|
||||||
|
|
||||||
typedef struct lnm_loop {
|
typedef struct lnm_loop {
|
||||||
int listen_fd;
|
int listen_fd;
|
||||||
int epoll_fd;
|
int epoll_fd;
|
||||||
|
@ -78,13 +39,6 @@ typedef struct lnm_loop {
|
||||||
void (*ctx_free)(void *ctx);
|
void (*ctx_free)(void *ctx);
|
||||||
void (*data_read)(lnm_loop_conn *conn);
|
void (*data_read)(lnm_loop_conn *conn);
|
||||||
void (*data_write)(lnm_loop_conn *conn);
|
void (*data_write)(lnm_loop_conn *conn);
|
||||||
lnm_loop_queue *wq;
|
|
||||||
struct {
|
|
||||||
// Mutex shared between all threads; used for counting thread IDs
|
|
||||||
pthread_mutex_t mutex;
|
|
||||||
size_t worker_count;
|
|
||||||
size_t epoll_count;
|
|
||||||
} threads;
|
|
||||||
} lnm_loop;
|
} lnm_loop;
|
||||||
|
|
||||||
lnm_err lnm_loop_init(lnm_loop **out, void *gctx,
|
lnm_err lnm_loop_init(lnm_loop **out, void *gctx,
|
||||||
|
@ -95,46 +49,6 @@ lnm_err lnm_loop_init(lnm_loop **out, void *gctx,
|
||||||
|
|
||||||
lnm_err lnm_loop_setup(lnm_loop *l, uint16_t port);
|
lnm_err lnm_loop_setup(lnm_loop *l, uint16_t port);
|
||||||
|
|
||||||
/**
|
lnm_err lnm_loop_run(lnm_loop *l, int thread_count);
|
||||||
* Run a single epoll thread of the event loop.
|
|
||||||
*/
|
|
||||||
lnm_err lnm_loop_run(lnm_loop *l);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Run a multithreaded event loop with the configured number of threads.
|
|
||||||
*/
|
|
||||||
lnm_err lnm_loop_run_multi(lnm_loop *l, size_t epoll_threads,
|
|
||||||
size_t worker_threads);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Advance the processing of the given connection.
|
|
||||||
*
|
|
||||||
* Behavior of this function depends on both the connection state and whether
|
|
||||||
* worker threads are enabled.
|
|
||||||
*
|
|
||||||
* For IO states, this function will perform network I/O along with executing
|
|
||||||
* the loop's respective processing functions.
|
|
||||||
*
|
|
||||||
* For work states, the respective processing functions are executed without
|
|
||||||
* performing any network I/O. If no worker queue is present, this function
|
|
||||||
* performs all blocking work until an I/O or the end state is reached. If there
|
|
||||||
* is a worker queue present, only one block of work is done before exiting,
|
|
||||||
* allowing further blocks of work to be scheduled on other worker threads.
|
|
||||||
*
|
|
||||||
* If no worker queue is present, this function will only exit once an I/O or
|
|
||||||
* end state is reached.
|
|
||||||
*/
|
|
||||||
void lnm_loop_conn_advance(lnm_loop *l, lnm_loop_conn *conn);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Reschedule the given connection, either on the event loop for network IO or
|
|
||||||
* on a worker thread for blocking work. Connections are terminated as needed.
|
|
||||||
*/
|
|
||||||
void lnm_loop_conn_schedule(lnm_loop *l, lnm_loop_conn *conn);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Main loop executed on the worker threads.
|
|
||||||
*/
|
|
||||||
void lnm_loop_worker_run(void *arg);
|
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -8,7 +8,6 @@
|
||||||
typedef struct lnm_http_step {
|
typedef struct lnm_http_step {
|
||||||
lnm_http_step_fn fn;
|
lnm_http_step_fn fn;
|
||||||
struct lnm_http_step *next;
|
struct lnm_http_step *next;
|
||||||
bool blocking;
|
|
||||||
} lnm_http_step;
|
} lnm_http_step;
|
||||||
|
|
||||||
typedef enum lnm_http_route_type {
|
typedef enum lnm_http_route_type {
|
||||||
|
|
|
@ -9,6 +9,6 @@ void lnm_loop_conn_free(lnm_loop *l, lnm_loop_conn *conn);
|
||||||
|
|
||||||
lnm_err lnm_loop_accept(lnm_loop *l);
|
lnm_err lnm_loop_accept(lnm_loop *l);
|
||||||
|
|
||||||
void lnm_loop_conn_advance(lnm_loop *l, lnm_loop_conn *conn);
|
void lnm_loop_conn_io(lnm_loop *l, lnm_loop_conn *conn);
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -28,8 +28,7 @@ lnm_err lnm_http_loop_init(lnm_http_loop **out, void *c_gctx,
|
||||||
return lnm_err_ok;
|
return lnm_err_ok;
|
||||||
}
|
}
|
||||||
|
|
||||||
lnm_err lnm_http_step_append(lnm_http_step **out, lnm_http_step_fn fn,
|
lnm_err lnm_http_step_init(lnm_http_step **out, lnm_http_step_fn fn) {
|
||||||
bool blocking) {
|
|
||||||
lnm_http_step *step = calloc(1, sizeof(lnm_http_step));
|
lnm_http_step *step = calloc(1, sizeof(lnm_http_step));
|
||||||
|
|
||||||
if (step == NULL) {
|
if (step == NULL) {
|
||||||
|
@ -37,13 +36,18 @@ lnm_err lnm_http_step_append(lnm_http_step **out, lnm_http_step_fn fn,
|
||||||
}
|
}
|
||||||
|
|
||||||
step->fn = fn;
|
step->fn = fn;
|
||||||
step->blocking = blocking;
|
*out = step;
|
||||||
|
|
||||||
if ((*out) != NULL) {
|
return lnm_err_ok;
|
||||||
(*out)->next = step;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
*out = step;
|
lnm_err lnm_http_step_append(lnm_http_step **out, lnm_http_step *step,
|
||||||
|
lnm_http_step_fn fn) {
|
||||||
|
LNM_RES(lnm_http_step_init(out, fn));
|
||||||
|
|
||||||
|
if (step != NULL) {
|
||||||
|
step->next = *out;
|
||||||
|
}
|
||||||
|
|
||||||
return lnm_err_ok;
|
return lnm_err_ok;
|
||||||
}
|
}
|
||||||
|
@ -118,10 +122,9 @@ lnm_err lnm_http_loop_route_add(lnm_http_loop *hl, lnm_http_route *route) {
|
||||||
return lnm_err_ok;
|
return lnm_err_ok;
|
||||||
}
|
}
|
||||||
|
|
||||||
lnm_err lnm_http_loop_run(lnm_http_loop *hl, uint16_t port,
|
lnm_err lnm_http_loop_run(lnm_http_loop *hl, uint16_t port, int thread_count) {
|
||||||
size_t epoll_threads, size_t worker_threads) {
|
|
||||||
LNM_RES(lnm_loop_setup(hl, port));
|
LNM_RES(lnm_loop_setup(hl, port));
|
||||||
return lnm_loop_run_multi(hl, epoll_threads, worker_threads);
|
return lnm_loop_run(hl, thread_count);
|
||||||
}
|
}
|
||||||
|
|
||||||
void lnm_http_loop_set_api_key(lnm_http_loop *hl, const char *api_key) {
|
void lnm_http_loop_set_api_key(lnm_http_loop *hl, const char *api_key) {
|
||||||
|
|
|
@ -133,19 +133,11 @@ void lnm_http_loop_process_steps(lnm_http_conn *conn) {
|
||||||
while ((ctx->cur_step != NULL) && (step != ctx->cur_step)) {
|
while ((ctx->cur_step != NULL) && (step != ctx->cur_step)) {
|
||||||
step = ctx->cur_step;
|
step = ctx->cur_step;
|
||||||
|
|
||||||
if (step->blocking && (conn->state != lnm_loop_state_req_work)) {
|
|
||||||
conn->state = lnm_loop_state_req_work;
|
|
||||||
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
switch (step->fn(conn)) {
|
switch (step->fn(conn)) {
|
||||||
case lnm_http_step_err_done:
|
case lnm_http_step_err_done:
|
||||||
ctx->cur_step = ctx->cur_step->next;
|
ctx->cur_step = ctx->cur_step->next;
|
||||||
break;
|
break;
|
||||||
case lnm_http_step_err_io_needed:
|
case lnm_http_step_err_io_needed:
|
||||||
// Ensure steps that require more I/O are executed on the event loop
|
|
||||||
conn->state = lnm_loop_state_req_io;
|
|
||||||
break;
|
break;
|
||||||
case lnm_http_step_err_close:
|
case lnm_http_step_err_close:
|
||||||
conn->state = lnm_loop_state_end;
|
conn->state = lnm_loop_state_end;
|
||||||
|
@ -157,7 +149,6 @@ void lnm_http_loop_process_steps(lnm_http_conn *conn) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ctx->cur_step == NULL) {
|
if (ctx->cur_step == NULL) {
|
||||||
conn->state = lnm_loop_state_res_io;
|
|
||||||
ctx->state = lnm_http_loop_state_add_headers;
|
ctx->state = lnm_http_loop_state_add_headers;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -334,23 +325,23 @@ void (*process_fns[])(lnm_http_conn *conn) = {
|
||||||
|
|
||||||
lnm_loop_state state_map[] = {
|
lnm_loop_state state_map[] = {
|
||||||
// parse_req
|
// parse_req
|
||||||
lnm_loop_state_req_io,
|
lnm_loop_state_req,
|
||||||
// route
|
// route
|
||||||
lnm_loop_state_req_io,
|
lnm_loop_state_req,
|
||||||
// parse_headers
|
// parse_headers
|
||||||
lnm_loop_state_req_io,
|
lnm_loop_state_req,
|
||||||
// steps
|
// steps
|
||||||
lnm_loop_state_req_io,
|
lnm_loop_state_req,
|
||||||
// add_headers
|
// add_headers
|
||||||
lnm_loop_state_req_io,
|
lnm_loop_state_req,
|
||||||
// write_status_line
|
// write_status_line
|
||||||
lnm_loop_state_res_io,
|
lnm_loop_state_res,
|
||||||
// write_headers
|
// write_headers
|
||||||
lnm_loop_state_res_io,
|
lnm_loop_state_res,
|
||||||
// write_body
|
// write_body
|
||||||
lnm_loop_state_res_io,
|
lnm_loop_state_res,
|
||||||
// finish
|
// finish
|
||||||
lnm_loop_state_res_io,
|
lnm_loop_state_res,
|
||||||
};
|
};
|
||||||
|
|
||||||
void lnm_http_loop_process(lnm_http_conn *conn) {
|
void lnm_http_loop_process(lnm_http_conn *conn) {
|
||||||
|
@ -378,7 +369,7 @@ void lnm_http_loop_process(lnm_http_conn *conn) {
|
||||||
|
|
||||||
// We move the request to a dedicated buffer if the read buffer needs to be
|
// We move the request to a dedicated buffer if the read buffer needs to be
|
||||||
// reused
|
// reused
|
||||||
if ((conn->state == lnm_loop_state_req_io) && (conn->state == loop_state) &&
|
if ((conn->state == lnm_loop_state_req) && (conn->state == loop_state) &&
|
||||||
(!ctx->req.buf.owned) && (ctx->req.buf.len > 0)) {
|
(!ctx->req.buf.owned) && (ctx->req.buf.len > 0)) {
|
||||||
char *buf = malloc(ctx->req.buf.len);
|
char *buf = malloc(ctx->req.buf.len);
|
||||||
|
|
||||||
|
|
|
@ -30,8 +30,6 @@ lnm_err lnm_loop_init(lnm_loop **out, void *gctx,
|
||||||
l->data_read = data_read;
|
l->data_read = data_read;
|
||||||
l->data_write = data_write;
|
l->data_write = data_write;
|
||||||
|
|
||||||
pthread_mutex_init(&l->threads.mutex, NULL);
|
|
||||||
|
|
||||||
*out = l;
|
*out = l;
|
||||||
|
|
||||||
return lnm_err_ok;
|
return lnm_err_ok;
|
||||||
|
@ -55,7 +53,7 @@ lnm_err lnm_loop_accept(lnm_loop *l) {
|
||||||
LNM_RES2(lnm_loop_conn_init(&conn, l), close(conn_fd));
|
LNM_RES2(lnm_loop_conn_init(&conn, l), close(conn_fd));
|
||||||
|
|
||||||
conn->fd = conn_fd;
|
conn->fd = conn_fd;
|
||||||
conn->state = lnm_loop_state_req_io;
|
conn->state = lnm_loop_state_req;
|
||||||
|
|
||||||
struct epoll_event event = {.data.ptr = conn,
|
struct epoll_event event = {.data.ptr = conn,
|
||||||
.events = EPOLLIN | EPOLLET | EPOLLONESHOT};
|
.events = EPOLLIN | EPOLLET | EPOLLONESHOT};
|
||||||
|
@ -64,10 +62,6 @@ lnm_err lnm_loop_accept(lnm_loop *l) {
|
||||||
|
|
||||||
l->open++;
|
l->open++;
|
||||||
|
|
||||||
// Make sure to re-arm the listening socket after accepting
|
|
||||||
event.data.ptr = NULL;
|
|
||||||
epoll_ctl(l->epoll_fd, EPOLL_CTL_MOD, l->listen_fd, &event);
|
|
||||||
|
|
||||||
lnm_ldebug(section, "connection opened with fd %i", conn_fd);
|
lnm_ldebug(section, "connection opened with fd %i", conn_fd);
|
||||||
|
|
||||||
return lnm_err_ok;
|
return lnm_err_ok;
|
||||||
|
@ -129,51 +123,19 @@ lnm_err lnm_loop_setup(lnm_loop *l, uint16_t port) {
|
||||||
return lnm_err_ok;
|
return lnm_err_ok;
|
||||||
}
|
}
|
||||||
|
|
||||||
void lnm_loop_conn_schedule(lnm_loop *l, lnm_loop_conn *conn) {
|
typedef struct lnm_loop_thread_args {
|
||||||
switch (conn->state) {
|
lnm_loop *l;
|
||||||
// IO states get rescheduled in the epoll loop
|
int id;
|
||||||
case lnm_loop_state_req_io:
|
int thread_count;
|
||||||
case lnm_loop_state_res_io: {
|
} lnm_loop_thread_args;
|
||||||
struct epoll_event event = {
|
|
||||||
.data.ptr = conn,
|
|
||||||
.events = (conn->state == lnm_loop_state_req_io ? EPOLLIN : EPOLLOUT) |
|
|
||||||
EPOLLET | EPOLLONESHOT};
|
|
||||||
|
|
||||||
epoll_ctl(l->epoll_fd, EPOLL_CTL_MOD, conn->fd, &event);
|
lnm_err lnm_loop_run_thread(lnm_loop_thread_args *args) {
|
||||||
} break;
|
lnm_loop *l = args->l;
|
||||||
case lnm_loop_state_req_work:
|
int thread_id = args->id;
|
||||||
case lnm_loop_state_res_work:
|
int thread_count = args->thread_count;
|
||||||
lnm_loop_queue_push(l->wq, conn);
|
|
||||||
break;
|
|
||||||
case lnm_loop_state_end: {
|
|
||||||
int conn_fd = conn->fd;
|
|
||||||
|
|
||||||
lnm_loop_conn_free(l, conn);
|
|
||||||
close(conn_fd);
|
|
||||||
l->open--;
|
|
||||||
|
|
||||||
epoll_ctl(l->epoll_fd, EPOLL_CTL_DEL, conn_fd, NULL);
|
|
||||||
|
|
||||||
lnm_ldebug(section, "connection closed with fd %i", conn_fd);
|
|
||||||
} break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
lnm_err lnm_loop_run(lnm_loop *l) {
|
|
||||||
if (l->epoll_fd == 0) {
|
|
||||||
return lnm_err_not_setup;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get thread ID by incrementing counter
|
|
||||||
pthread_mutex_lock(&l->threads.mutex);
|
|
||||||
|
|
||||||
int thread_id = l->threads.epoll_count;
|
|
||||||
l->threads.epoll_count++;
|
|
||||||
|
|
||||||
pthread_mutex_unlock(&l->threads.mutex);
|
|
||||||
|
|
||||||
struct epoll_event *events = calloc(1, sizeof(struct epoll_event));
|
struct epoll_event *events = calloc(1, sizeof(struct epoll_event));
|
||||||
size_t events_cap = 1;
|
int events_cap = 1;
|
||||||
|
|
||||||
if (events == NULL) {
|
if (events == NULL) {
|
||||||
return lnm_err_failed_alloc;
|
return lnm_err_failed_alloc;
|
||||||
|
@ -181,6 +143,9 @@ lnm_err lnm_loop_run(lnm_loop *l) {
|
||||||
|
|
||||||
lnm_linfo(section, "thread %i started", thread_id);
|
lnm_linfo(section, "thread %i started", thread_id);
|
||||||
|
|
||||||
|
struct epoll_event listen_event = {
|
||||||
|
.data.ptr = NULL, .events = EPOLLIN | EPOLLET | EPOLLONESHOT};
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
int polled = epoll_wait(l->epoll_fd, events, events_cap, -1);
|
int polled = epoll_wait(l->epoll_fd, events, events_cap, -1);
|
||||||
lnm_ldebug(section, "polled (thread %i): %i", thread_id, polled);
|
lnm_ldebug(section, "polled (thread %i): %i", thread_id, polled);
|
||||||
|
@ -192,19 +157,36 @@ lnm_err lnm_loop_run(lnm_loop *l) {
|
||||||
for (int i = 0; i < polled; i++) {
|
for (int i = 0; i < polled; i++) {
|
||||||
if (events[i].data.ptr == NULL) {
|
if (events[i].data.ptr == NULL) {
|
||||||
lnm_loop_accept(l);
|
lnm_loop_accept(l);
|
||||||
|
|
||||||
|
epoll_ctl(l->epoll_fd, EPOLL_CTL_MOD, l->listen_fd, &listen_event);
|
||||||
} else {
|
} else {
|
||||||
lnm_loop_conn *conn = events[i].data.ptr;
|
lnm_loop_conn *conn = events[i].data.ptr;
|
||||||
|
lnm_loop_conn_io(l, conn);
|
||||||
|
|
||||||
// At this point, state is always an IO state
|
if (conn->state == lnm_loop_state_end) {
|
||||||
lnm_loop_conn_advance(l, conn);
|
int conn_fd = conn->fd;
|
||||||
lnm_loop_conn_schedule(l, conn);
|
|
||||||
|
lnm_loop_conn_free(l, conn);
|
||||||
|
close(conn_fd);
|
||||||
|
l->open--;
|
||||||
|
|
||||||
|
epoll_ctl(l->epoll_fd, EPOLL_CTL_DEL, conn_fd, NULL);
|
||||||
|
|
||||||
|
lnm_ldebug(section, "connection closed with fd %i", conn_fd);
|
||||||
|
} else {
|
||||||
|
struct epoll_event event = {
|
||||||
|
.data.ptr = conn,
|
||||||
|
.events =
|
||||||
|
(conn->state == lnm_loop_state_req ? EPOLLIN : EPOLLOUT) |
|
||||||
|
EPOLLET | EPOLLONESHOT};
|
||||||
|
epoll_ctl(l->epoll_fd, EPOLL_CTL_MOD, conn->fd, &event);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
size_t open = l->open;
|
int open = l->open;
|
||||||
size_t cap_per_thread = open + 1 > l->threads.epoll_count
|
int cap_per_thread =
|
||||||
? (open + 1) / l->threads.epoll_count
|
open + 1 > thread_count ? (open + 1) / thread_count : 1;
|
||||||
: 1;
|
|
||||||
|
|
||||||
if (cap_per_thread > events_cap) {
|
if (cap_per_thread > events_cap) {
|
||||||
struct epoll_event *new_events =
|
struct epoll_event *new_events =
|
||||||
|
@ -223,21 +205,28 @@ lnm_err lnm_loop_run(lnm_loop *l) {
|
||||||
return lnm_err_ok;
|
return lnm_err_ok;
|
||||||
}
|
}
|
||||||
|
|
||||||
lnm_err lnm_loop_run_multi(lnm_loop *l, size_t epoll_threads,
|
lnm_err lnm_loop_run(lnm_loop *l, int thread_count) {
|
||||||
size_t worker_threads) {
|
if (l->epoll_fd == 0) {
|
||||||
if (worker_threads > 0) {
|
return lnm_err_not_setup;
|
||||||
LNM_RES(lnm_loop_queue_init(&l->wq, LNM_QUEUE_MULTIPLIER * worker_threads));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pthread_t t;
|
lnm_loop_thread_args args[thread_count];
|
||||||
|
|
||||||
for (size_t i = 1; i < epoll_threads; i++) {
|
for (int i = 1; i < thread_count; i++) {
|
||||||
pthread_create(&t, NULL, (void *(*)(void *))lnm_loop_run, l);
|
args[i].l = l;
|
||||||
|
args[i].id = i;
|
||||||
|
args[i].thread_count = thread_count;
|
||||||
|
|
||||||
|
pthread_t thread;
|
||||||
|
pthread_create(&thread, NULL, (void *(*)(void *))lnm_loop_run_thread,
|
||||||
|
&args[i]);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (size_t i = 0; i < worker_threads; i++) {
|
args[0].l = l;
|
||||||
pthread_create(&t, NULL, (void *(*)(void *))lnm_loop_worker_run, l);
|
args[0].id = 0;
|
||||||
}
|
args[0].thread_count = thread_count;
|
||||||
|
|
||||||
return lnm_loop_run(l);
|
lnm_loop_run_thread(&args[0]);
|
||||||
|
|
||||||
|
return lnm_err_ok;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,5 @@
|
||||||
#include <errno.h>
|
#include <errno.h>
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
#include <sys/socket.h>
|
|
||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
|
|
||||||
#include "lnm/loop.h"
|
#include "lnm/loop.h"
|
||||||
|
@ -34,7 +33,7 @@ void lnm_loop_conn_io_req(lnm_loop *l, lnm_loop_conn *conn) {
|
||||||
|
|
||||||
conn->r.size += res;
|
conn->r.size += res;
|
||||||
l->data_read(conn);
|
l->data_read(conn);
|
||||||
} while (conn->state == lnm_loop_state_req_io);
|
} while (conn->state == lnm_loop_state_req);
|
||||||
}
|
}
|
||||||
|
|
||||||
void lnm_loop_conn_io_res(lnm_loop *l, lnm_loop_conn *conn) {
|
void lnm_loop_conn_io_res(lnm_loop *l, lnm_loop_conn *conn) {
|
||||||
|
@ -44,9 +43,7 @@ void lnm_loop_conn_io_res(lnm_loop *l, lnm_loop_conn *conn) {
|
||||||
ssize_t res;
|
ssize_t res;
|
||||||
|
|
||||||
do {
|
do {
|
||||||
// Send with MSG_NOSIGNAL prevents closed pipes from exiting the program
|
res = write(conn->fd, conn->w.buf, conn->w.size);
|
||||||
// with SIGPIPE
|
|
||||||
res = send(conn->fd, conn->w.buf, conn->w.size, MSG_NOSIGNAL);
|
|
||||||
} while (res < 0 && errno == EINTR);
|
} while (res < 0 && errno == EINTR);
|
||||||
|
|
||||||
// Write can't be performed without blocking; we come back later
|
// Write can't be performed without blocking; we come back later
|
||||||
|
@ -64,33 +61,17 @@ void lnm_loop_conn_io_res(lnm_loop *l, lnm_loop_conn *conn) {
|
||||||
// writer function more space to work with
|
// writer function more space to work with
|
||||||
memmove(conn->w.buf, &conn->w.buf[res], conn->w.size - res);
|
memmove(conn->w.buf, &conn->w.buf[res], conn->w.size - res);
|
||||||
conn->w.size -= res;
|
conn->w.size -= res;
|
||||||
} while (conn->state == lnm_loop_state_res_io);
|
} while (conn->state == lnm_loop_state_res);
|
||||||
}
|
}
|
||||||
|
|
||||||
void lnm_loop_conn_advance(lnm_loop *l, lnm_loop_conn *conn) {
|
void lnm_loop_conn_io(lnm_loop *l, lnm_loop_conn *conn) {
|
||||||
do {
|
|
||||||
|
|
||||||
switch (conn->state) {
|
switch (conn->state) {
|
||||||
case lnm_loop_state_req_io:
|
case lnm_loop_state_req:
|
||||||
lnm_loop_conn_io_req(l, conn);
|
lnm_loop_conn_io_req(l, conn);
|
||||||
break;
|
break;
|
||||||
case lnm_loop_state_res_io:
|
case lnm_loop_state_res:
|
||||||
lnm_loop_conn_io_res(l, conn);
|
lnm_loop_conn_io_res(l, conn);
|
||||||
break;
|
break;
|
||||||
case lnm_loop_state_req_work:
|
|
||||||
do {
|
|
||||||
l->data_read(conn);
|
|
||||||
} while (conn->state == lnm_loop_state_req_work);
|
|
||||||
break;
|
|
||||||
case lnm_loop_state_res_work:
|
|
||||||
do {
|
|
||||||
l->data_write(conn);
|
|
||||||
} while (conn->state == lnm_loop_state_res_work);
|
|
||||||
break;
|
|
||||||
default:;
|
default:;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Execute all blocking work if we're running in single-threaded mode
|
|
||||||
while (l->wq == NULL && (conn->state == lnm_loop_state_req_work ||
|
|
||||||
conn->state == lnm_loop_state_res_work));
|
|
||||||
}
|
|
||||||
|
|
|
@ -1,92 +0,0 @@
|
||||||
#include <sys/epoll.h>
|
|
||||||
|
|
||||||
#include "lnm/log.h"
|
|
||||||
#include "lnm/loop.h"
|
|
||||||
|
|
||||||
lnm_err lnm_loop_queue_init(lnm_loop_queue **out, size_t cap) {
|
|
||||||
lnm_loop_conn **arr = calloc(cap, sizeof(lnm_loop_conn *));
|
|
||||||
|
|
||||||
if (arr == NULL) {
|
|
||||||
return lnm_err_failed_alloc;
|
|
||||||
}
|
|
||||||
|
|
||||||
lnm_loop_queue *q = calloc(1, sizeof(lnm_loop_queue));
|
|
||||||
|
|
||||||
if (q == NULL) {
|
|
||||||
free(arr);
|
|
||||||
|
|
||||||
return lnm_err_failed_alloc;
|
|
||||||
}
|
|
||||||
|
|
||||||
q->buf.arr = arr;
|
|
||||||
q->buf.len = cap;
|
|
||||||
|
|
||||||
q->tail = 0;
|
|
||||||
q->head = 0;
|
|
||||||
q->empty = true;
|
|
||||||
|
|
||||||
pthread_mutex_init(&q->mutex, NULL);
|
|
||||||
pthread_cond_init(&q->cond, NULL);
|
|
||||||
|
|
||||||
*out = q;
|
|
||||||
|
|
||||||
return lnm_err_ok;
|
|
||||||
}
|
|
||||||
|
|
||||||
void lnm_loop_queue_push(lnm_loop_queue *q, lnm_loop_conn *conn) {
|
|
||||||
pthread_mutex_lock(&q->mutex);
|
|
||||||
|
|
||||||
while (q->head == q->tail && !q->empty) {
|
|
||||||
pthread_cond_wait(&q->cond, &q->mutex);
|
|
||||||
}
|
|
||||||
|
|
||||||
q->buf.arr[q->head] = conn;
|
|
||||||
|
|
||||||
// Make sure the index wraps around
|
|
||||||
q->head = (q->head + 1) % q->buf.len;
|
|
||||||
q->empty = false;
|
|
||||||
|
|
||||||
// Unlock mutex and signal to waiting threads
|
|
||||||
pthread_mutex_unlock(&q->mutex);
|
|
||||||
pthread_cond_signal(&q->cond);
|
|
||||||
}
|
|
||||||
|
|
||||||
lnm_loop_conn *lnm_loop_queue_pop(lnm_loop_queue *q) {
|
|
||||||
pthread_mutex_lock(&q->mutex);
|
|
||||||
|
|
||||||
while (q->empty) {
|
|
||||||
pthread_cond_wait(&q->cond, &q->mutex);
|
|
||||||
}
|
|
||||||
|
|
||||||
lnm_loop_conn *out = q->buf.arr[q->tail];
|
|
||||||
|
|
||||||
q->tail = (q->tail + 1) % q->buf.len;
|
|
||||||
q->empty = q->tail == q->head;
|
|
||||||
|
|
||||||
// Unlock mutex and signal to waiting threads
|
|
||||||
pthread_mutex_unlock(&q->mutex);
|
|
||||||
pthread_cond_signal(&q->cond);
|
|
||||||
|
|
||||||
return out;
|
|
||||||
}
|
|
||||||
|
|
||||||
void lnm_loop_worker_run(void *arg) {
|
|
||||||
lnm_loop *l = arg;
|
|
||||||
lnm_loop_queue *q = l->wq;
|
|
||||||
|
|
||||||
// Get thread ID by incrementing counter
|
|
||||||
pthread_mutex_lock(&l->threads.mutex);
|
|
||||||
|
|
||||||
int thread_id = l->threads.worker_count;
|
|
||||||
l->threads.worker_count++;
|
|
||||||
|
|
||||||
pthread_mutex_unlock(&l->threads.mutex);
|
|
||||||
|
|
||||||
while (1) {
|
|
||||||
lnm_loop_conn *conn = lnm_loop_queue_pop(q);
|
|
||||||
lnm_ldebug("loop", "worker %i processing fd %i", thread_id, conn->fd);
|
|
||||||
|
|
||||||
lnm_loop_conn_advance(l, conn);
|
|
||||||
lnm_loop_conn_schedule(l, conn);
|
|
||||||
}
|
|
||||||
}
|
|
Loading…
Reference in New Issue