Compare commits

..

7 Commits

10 changed files with 320 additions and 111 deletions

View File

@ -27,12 +27,12 @@ int main() {
ctx_reset, ctx_reset,
ctx_free); ctx_free);
lnm_http_step_init(&step, slow_step); lnm_http_step_append(&step, slow_step, true);
lnm_http_route_init_literal(&route, lnm_http_method_get, "/", step); lnm_http_route_init_literal(&route, lnm_http_method_get, "/", step);
lnm_http_loop_route_add(hl, route); lnm_http_loop_route_add(hl, route);
lnm_log_init_global(); lnm_log_init_global();
lnm_log_register_stdout(lnm_log_level_debug); lnm_log_register_stdout(lnm_log_level_debug);
lnm_http_loop_run(hl, 8080, 1); printf("res = %i\n", lnm_http_loop_run(hl, 8080, 1, 2));
} }

View File

@ -32,23 +32,16 @@ lnm_err lnm_http_loop_init(lnm_http_loop **out, void *c_gctx,
lnm_http_ctx_reset_fn ctx_reset, lnm_http_ctx_reset_fn ctx_reset,
lnm_http_ctx_free_fn ctx_free); lnm_http_ctx_free_fn ctx_free);
/**
* Initialize a new step.
*
* @param out where to store pointer to new `lnm_http_step`
* @param fn step function
*/
lnm_err lnm_http_step_init(lnm_http_step **out, lnm_http_step_fn fn);
/** /**
* Append the given step fn to the step. * Append the given step fn to the step.
* *
* @param out where to store pointer to new `lnm_http_step` * @param out both the previous step to append the new step to, and the output
* @param step step to append new step to * variable to which the new step is appended
* @param fn step function * @param fn step function
* @param blocking whether the step is blocking or not
*/ */
lnm_err lnm_http_step_append(lnm_http_step **out, lnm_http_step *step, lnm_err lnm_http_step_append(lnm_http_step **out, lnm_http_step_fn fn,
lnm_http_step_fn fn); bool blocking);
/** /**
* Initialize a new route of type literal. * Initialize a new route of type literal.
@ -81,7 +74,8 @@ lnm_err lnm_http_route_init_regex(lnm_http_route **out, lnm_http_method method,
*/ */
lnm_err lnm_http_loop_route_add(lnm_http_loop *hl, lnm_http_route *route); lnm_err lnm_http_loop_route_add(lnm_http_loop *hl, lnm_http_route *route);
lnm_err lnm_http_loop_run(lnm_http_loop *hl, uint16_t port, int thread_count); lnm_err lnm_http_loop_run(lnm_http_loop *hl, uint16_t port,
size_t epoll_threads, size_t worker_threads);
void lnm_http_loop_set_api_key(lnm_http_loop *hl, const char *api_key); void lnm_http_loop_set_api_key(lnm_http_loop *hl, const char *api_key);

View File

@ -1,6 +1,7 @@
#ifndef LNM_LOOP #ifndef LNM_LOOP
#define LNM_LOOP #define LNM_LOOP
#include <pthread.h>
#include <stdatomic.h> #include <stdatomic.h>
#include <stdint.h> #include <stdint.h>
#include <stdlib.h> #include <stdlib.h>
@ -8,13 +9,19 @@
#include "lnm/common.h" #include "lnm/common.h"
#define LNM_LOOP_BUF_SIZE 2048 #define LNM_LOOP_BUF_SIZE 2048
#define LNM_QUEUE_MULTIPLIER 8
typedef enum { typedef enum lnm_loop_state {
lnm_loop_state_req = 0, lnm_loop_state_req_io = 0,
lnm_loop_state_res, lnm_loop_state_res_io,
lnm_loop_state_end, lnm_loop_state_end,
lnm_loop_state_req_work,
lnm_loop_state_res_work,
} lnm_loop_state; } lnm_loop_state;
/**
* State for a currently active connection
*/
typedef struct lnm_loop_conn { typedef struct lnm_loop_conn {
int fd; int fd;
lnm_loop_state state; lnm_loop_state state;
@ -30,6 +37,38 @@ typedef struct lnm_loop_conn {
} w; } w;
} lnm_loop_conn; } lnm_loop_conn;
/**
* Concurrent fixed-size queue used to distribute work among worker threads
*/
typedef struct lnm_loop_queue {
struct {
lnm_loop_conn **arr;
size_t len;
} buf;
size_t head;
size_t tail;
bool empty;
pthread_mutex_t mutex;
pthread_cond_t cond;
} lnm_loop_queue;
/**
* Initialize a new queue with the specified fixed capacity
*/
lnm_err lnm_loop_queue_init(lnm_loop_queue **out, size_t cap);
/**
* Queue the given connection. If the queue is currently full, this action
* blocks until there is space available.
*/
void lnm_loop_queue_push(lnm_loop_queue *q, lnm_loop_conn *conn);
/**
* Pop a connection from the queue. This action blocks until a connection is
* available.
*/
lnm_loop_conn *lnm_loop_queue_pop(lnm_loop_queue *q);
typedef struct lnm_loop { typedef struct lnm_loop {
int listen_fd; int listen_fd;
int epoll_fd; int epoll_fd;
@ -39,6 +78,13 @@ typedef struct lnm_loop {
void (*ctx_free)(void *ctx); void (*ctx_free)(void *ctx);
void (*data_read)(lnm_loop_conn *conn); void (*data_read)(lnm_loop_conn *conn);
void (*data_write)(lnm_loop_conn *conn); void (*data_write)(lnm_loop_conn *conn);
lnm_loop_queue *wq;
struct {
// Mutex shared between all threads; used for counting thread IDs
pthread_mutex_t mutex;
size_t worker_count;
size_t epoll_count;
} threads;
} lnm_loop; } lnm_loop;
lnm_err lnm_loop_init(lnm_loop **out, void *gctx, lnm_err lnm_loop_init(lnm_loop **out, void *gctx,
@ -49,6 +95,46 @@ lnm_err lnm_loop_init(lnm_loop **out, void *gctx,
lnm_err lnm_loop_setup(lnm_loop *l, uint16_t port); lnm_err lnm_loop_setup(lnm_loop *l, uint16_t port);
lnm_err lnm_loop_run(lnm_loop *l, int thread_count); /**
* Run a single epoll thread of the event loop.
*/
lnm_err lnm_loop_run(lnm_loop *l);
/**
* Run a multithreaded event loop with the configured number of threads.
*/
lnm_err lnm_loop_run_multi(lnm_loop *l, size_t epoll_threads,
size_t worker_threads);
/**
* Advance the processing of the given connection.
*
* Behavior of this function depends on both the connection state and whether
* worker threads are enabled.
*
* For IO states, this function will perform network I/O along with executing
* the loop's respective processing functions.
*
* For work states, the respective processing functions are executed without
* performing any network I/O. If no worker queue is present, this function
* performs all blocking work until an I/O or the end state is reached. If there
* is a worker queue present, only one block of work is done before exiting,
* allowing further blocks of work to be scheduled on other worker threads.
*
* If no worker queue is present, this function will only exit once an I/O or
* end state is reached.
*/
void lnm_loop_conn_advance(lnm_loop *l, lnm_loop_conn *conn);
/**
* Reschedule the given connection, either on the event loop for network IO or
* on a worker thread for blocking work. Connections are terminated as needed.
*/
void lnm_loop_conn_schedule(lnm_loop *l, lnm_loop_conn *conn);
/**
* Main loop executed on the worker threads.
*/
void lnm_loop_worker_run(void *arg);
#endif #endif

View File

@ -8,6 +8,7 @@
typedef struct lnm_http_step { typedef struct lnm_http_step {
lnm_http_step_fn fn; lnm_http_step_fn fn;
struct lnm_http_step *next; struct lnm_http_step *next;
bool blocking;
} lnm_http_step; } lnm_http_step;
typedef enum lnm_http_route_type { typedef enum lnm_http_route_type {

View File

@ -9,6 +9,6 @@ void lnm_loop_conn_free(lnm_loop *l, lnm_loop_conn *conn);
lnm_err lnm_loop_accept(lnm_loop *l); lnm_err lnm_loop_accept(lnm_loop *l);
void lnm_loop_conn_io(lnm_loop *l, lnm_loop_conn *conn); void lnm_loop_conn_advance(lnm_loop *l, lnm_loop_conn *conn);
#endif #endif

View File

@ -28,7 +28,8 @@ lnm_err lnm_http_loop_init(lnm_http_loop **out, void *c_gctx,
return lnm_err_ok; return lnm_err_ok;
} }
lnm_err lnm_http_step_init(lnm_http_step **out, lnm_http_step_fn fn) { lnm_err lnm_http_step_append(lnm_http_step **out, lnm_http_step_fn fn,
bool blocking) {
lnm_http_step *step = calloc(1, sizeof(lnm_http_step)); lnm_http_step *step = calloc(1, sizeof(lnm_http_step));
if (step == NULL) { if (step == NULL) {
@ -36,22 +37,17 @@ lnm_err lnm_http_step_init(lnm_http_step **out, lnm_http_step_fn fn) {
} }
step->fn = fn; step->fn = fn;
step->blocking = blocking;
if ((*out) != NULL) {
(*out)->next = step;
}
*out = step; *out = step;
return lnm_err_ok; return lnm_err_ok;
} }
lnm_err lnm_http_step_append(lnm_http_step **out, lnm_http_step *step,
lnm_http_step_fn fn) {
LNM_RES(lnm_http_step_init(out, fn));
if (step != NULL) {
step->next = *out;
}
return lnm_err_ok;
}
lnm_err lnm_http_route_init(lnm_http_route **out) { lnm_err lnm_http_route_init(lnm_http_route **out) {
lnm_http_route *route = calloc(1, sizeof(lnm_http_route)); lnm_http_route *route = calloc(1, sizeof(lnm_http_route));
@ -122,9 +118,10 @@ lnm_err lnm_http_loop_route_add(lnm_http_loop *hl, lnm_http_route *route) {
return lnm_err_ok; return lnm_err_ok;
} }
lnm_err lnm_http_loop_run(lnm_http_loop *hl, uint16_t port, int thread_count) { lnm_err lnm_http_loop_run(lnm_http_loop *hl, uint16_t port,
size_t epoll_threads, size_t worker_threads) {
LNM_RES(lnm_loop_setup(hl, port)); LNM_RES(lnm_loop_setup(hl, port));
return lnm_loop_run(hl, thread_count); return lnm_loop_run_multi(hl, epoll_threads, worker_threads);
} }
void lnm_http_loop_set_api_key(lnm_http_loop *hl, const char *api_key) { void lnm_http_loop_set_api_key(lnm_http_loop *hl, const char *api_key) {

View File

@ -133,11 +133,19 @@ void lnm_http_loop_process_steps(lnm_http_conn *conn) {
while ((ctx->cur_step != NULL) && (step != ctx->cur_step)) { while ((ctx->cur_step != NULL) && (step != ctx->cur_step)) {
step = ctx->cur_step; step = ctx->cur_step;
if (step->blocking && (conn->state != lnm_loop_state_req_work)) {
conn->state = lnm_loop_state_req_work;
break;
}
switch (step->fn(conn)) { switch (step->fn(conn)) {
case lnm_http_step_err_done: case lnm_http_step_err_done:
ctx->cur_step = ctx->cur_step->next; ctx->cur_step = ctx->cur_step->next;
break; break;
case lnm_http_step_err_io_needed: case lnm_http_step_err_io_needed:
// Ensure steps that require more I/O are executed on the event loop
conn->state = lnm_loop_state_req_io;
break; break;
case lnm_http_step_err_close: case lnm_http_step_err_close:
conn->state = lnm_loop_state_end; conn->state = lnm_loop_state_end;
@ -149,6 +157,7 @@ void lnm_http_loop_process_steps(lnm_http_conn *conn) {
} }
if (ctx->cur_step == NULL) { if (ctx->cur_step == NULL) {
conn->state = lnm_loop_state_res_io;
ctx->state = lnm_http_loop_state_add_headers; ctx->state = lnm_http_loop_state_add_headers;
} }
} }
@ -325,23 +334,23 @@ void (*process_fns[])(lnm_http_conn *conn) = {
lnm_loop_state state_map[] = { lnm_loop_state state_map[] = {
// parse_req // parse_req
lnm_loop_state_req, lnm_loop_state_req_io,
// route // route
lnm_loop_state_req, lnm_loop_state_req_io,
// parse_headers // parse_headers
lnm_loop_state_req, lnm_loop_state_req_io,
// steps // steps
lnm_loop_state_req, lnm_loop_state_req_io,
// add_headers // add_headers
lnm_loop_state_req, lnm_loop_state_req_io,
// write_status_line // write_status_line
lnm_loop_state_res, lnm_loop_state_res_io,
// write_headers // write_headers
lnm_loop_state_res, lnm_loop_state_res_io,
// write_body // write_body
lnm_loop_state_res, lnm_loop_state_res_io,
// finish // finish
lnm_loop_state_res, lnm_loop_state_res_io,
}; };
void lnm_http_loop_process(lnm_http_conn *conn) { void lnm_http_loop_process(lnm_http_conn *conn) {
@ -369,7 +378,7 @@ void lnm_http_loop_process(lnm_http_conn *conn) {
// We move the request to a dedicated buffer if the read buffer needs to be // We move the request to a dedicated buffer if the read buffer needs to be
// reused // reused
if ((conn->state == lnm_loop_state_req) && (conn->state == loop_state) && if ((conn->state == lnm_loop_state_req_io) && (conn->state == loop_state) &&
(!ctx->req.buf.owned) && (ctx->req.buf.len > 0)) { (!ctx->req.buf.owned) && (ctx->req.buf.len > 0)) {
char *buf = malloc(ctx->req.buf.len); char *buf = malloc(ctx->req.buf.len);

View File

@ -30,6 +30,8 @@ lnm_err lnm_loop_init(lnm_loop **out, void *gctx,
l->data_read = data_read; l->data_read = data_read;
l->data_write = data_write; l->data_write = data_write;
pthread_mutex_init(&l->threads.mutex, NULL);
*out = l; *out = l;
return lnm_err_ok; return lnm_err_ok;
@ -53,7 +55,7 @@ lnm_err lnm_loop_accept(lnm_loop *l) {
LNM_RES2(lnm_loop_conn_init(&conn, l), close(conn_fd)); LNM_RES2(lnm_loop_conn_init(&conn, l), close(conn_fd));
conn->fd = conn_fd; conn->fd = conn_fd;
conn->state = lnm_loop_state_req; conn->state = lnm_loop_state_req_io;
struct epoll_event event = {.data.ptr = conn, struct epoll_event event = {.data.ptr = conn,
.events = EPOLLIN | EPOLLET | EPOLLONESHOT}; .events = EPOLLIN | EPOLLET | EPOLLONESHOT};
@ -62,6 +64,10 @@ lnm_err lnm_loop_accept(lnm_loop *l) {
l->open++; l->open++;
// Make sure to re-arm the listening socket after accepting
event.data.ptr = NULL;
epoll_ctl(l->epoll_fd, EPOLL_CTL_MOD, l->listen_fd, &event);
lnm_ldebug(section, "connection opened with fd %i", conn_fd); lnm_ldebug(section, "connection opened with fd %i", conn_fd);
return lnm_err_ok; return lnm_err_ok;
@ -123,19 +129,51 @@ lnm_err lnm_loop_setup(lnm_loop *l, uint16_t port) {
return lnm_err_ok; return lnm_err_ok;
} }
typedef struct lnm_loop_thread_args { void lnm_loop_conn_schedule(lnm_loop *l, lnm_loop_conn *conn) {
lnm_loop *l; switch (conn->state) {
int id; // IO states get rescheduled in the epoll loop
int thread_count; case lnm_loop_state_req_io:
} lnm_loop_thread_args; case lnm_loop_state_res_io: {
struct epoll_event event = {
.data.ptr = conn,
.events = (conn->state == lnm_loop_state_req_io ? EPOLLIN : EPOLLOUT) |
EPOLLET | EPOLLONESHOT};
lnm_err lnm_loop_run_thread(lnm_loop_thread_args *args) { epoll_ctl(l->epoll_fd, EPOLL_CTL_MOD, conn->fd, &event);
lnm_loop *l = args->l; } break;
int thread_id = args->id; case lnm_loop_state_req_work:
int thread_count = args->thread_count; case lnm_loop_state_res_work:
lnm_loop_queue_push(l->wq, conn);
break;
case lnm_loop_state_end: {
int conn_fd = conn->fd;
lnm_loop_conn_free(l, conn);
close(conn_fd);
l->open--;
epoll_ctl(l->epoll_fd, EPOLL_CTL_DEL, conn_fd, NULL);
lnm_ldebug(section, "connection closed with fd %i", conn_fd);
} break;
}
}
lnm_err lnm_loop_run(lnm_loop *l) {
if (l->epoll_fd == 0) {
return lnm_err_not_setup;
}
// Get thread ID by incrementing counter
pthread_mutex_lock(&l->threads.mutex);
int thread_id = l->threads.epoll_count;
l->threads.epoll_count++;
pthread_mutex_unlock(&l->threads.mutex);
struct epoll_event *events = calloc(1, sizeof(struct epoll_event)); struct epoll_event *events = calloc(1, sizeof(struct epoll_event));
int events_cap = 1; size_t events_cap = 1;
if (events == NULL) { if (events == NULL) {
return lnm_err_failed_alloc; return lnm_err_failed_alloc;
@ -143,9 +181,6 @@ lnm_err lnm_loop_run_thread(lnm_loop_thread_args *args) {
lnm_linfo(section, "thread %i started", thread_id); lnm_linfo(section, "thread %i started", thread_id);
struct epoll_event listen_event = {
.data.ptr = NULL, .events = EPOLLIN | EPOLLET | EPOLLONESHOT};
while (1) { while (1) {
int polled = epoll_wait(l->epoll_fd, events, events_cap, -1); int polled = epoll_wait(l->epoll_fd, events, events_cap, -1);
lnm_ldebug(section, "polled (thread %i): %i", thread_id, polled); lnm_ldebug(section, "polled (thread %i): %i", thread_id, polled);
@ -157,36 +192,19 @@ lnm_err lnm_loop_run_thread(lnm_loop_thread_args *args) {
for (int i = 0; i < polled; i++) { for (int i = 0; i < polled; i++) {
if (events[i].data.ptr == NULL) { if (events[i].data.ptr == NULL) {
lnm_loop_accept(l); lnm_loop_accept(l);
epoll_ctl(l->epoll_fd, EPOLL_CTL_MOD, l->listen_fd, &listen_event);
} else { } else {
lnm_loop_conn *conn = events[i].data.ptr; lnm_loop_conn *conn = events[i].data.ptr;
lnm_loop_conn_io(l, conn);
if (conn->state == lnm_loop_state_end) { // At this point, state is always an IO state
int conn_fd = conn->fd; lnm_loop_conn_advance(l, conn);
lnm_loop_conn_schedule(l, conn);
lnm_loop_conn_free(l, conn);
close(conn_fd);
l->open--;
epoll_ctl(l->epoll_fd, EPOLL_CTL_DEL, conn_fd, NULL);
lnm_ldebug(section, "connection closed with fd %i", conn_fd);
} else {
struct epoll_event event = {
.data.ptr = conn,
.events =
(conn->state == lnm_loop_state_req ? EPOLLIN : EPOLLOUT) |
EPOLLET | EPOLLONESHOT};
epoll_ctl(l->epoll_fd, EPOLL_CTL_MOD, conn->fd, &event);
}
} }
} }
int open = l->open; size_t open = l->open;
int cap_per_thread = size_t cap_per_thread = open + 1 > l->threads.epoll_count
open + 1 > thread_count ? (open + 1) / thread_count : 1; ? (open + 1) / l->threads.epoll_count
: 1;
if (cap_per_thread > events_cap) { if (cap_per_thread > events_cap) {
struct epoll_event *new_events = struct epoll_event *new_events =
@ -205,28 +223,21 @@ lnm_err lnm_loop_run_thread(lnm_loop_thread_args *args) {
return lnm_err_ok; return lnm_err_ok;
} }
lnm_err lnm_loop_run(lnm_loop *l, int thread_count) { lnm_err lnm_loop_run_multi(lnm_loop *l, size_t epoll_threads,
if (l->epoll_fd == 0) { size_t worker_threads) {
return lnm_err_not_setup; if (worker_threads > 0) {
LNM_RES(lnm_loop_queue_init(&l->wq, LNM_QUEUE_MULTIPLIER * worker_threads));
} }
lnm_loop_thread_args args[thread_count]; pthread_t t;
for (int i = 1; i < thread_count; i++) { for (size_t i = 1; i < epoll_threads; i++) {
args[i].l = l; pthread_create(&t, NULL, (void *(*)(void *))lnm_loop_run, l);
args[i].id = i;
args[i].thread_count = thread_count;
pthread_t thread;
pthread_create(&thread, NULL, (void *(*)(void *))lnm_loop_run_thread,
&args[i]);
} }
args[0].l = l; for (size_t i = 0; i < worker_threads; i++) {
args[0].id = 0; pthread_create(&t, NULL, (void *(*)(void *))lnm_loop_worker_run, l);
args[0].thread_count = thread_count; }
lnm_loop_run_thread(&args[0]); return lnm_loop_run(l);
return lnm_err_ok;
} }

View File

@ -1,5 +1,6 @@
#include <errno.h> #include <errno.h>
#include <string.h> #include <string.h>
#include <sys/socket.h>
#include <unistd.h> #include <unistd.h>
#include "lnm/loop.h" #include "lnm/loop.h"
@ -33,7 +34,7 @@ void lnm_loop_conn_io_req(lnm_loop *l, lnm_loop_conn *conn) {
conn->r.size += res; conn->r.size += res;
l->data_read(conn); l->data_read(conn);
} while (conn->state == lnm_loop_state_req); } while (conn->state == lnm_loop_state_req_io);
} }
void lnm_loop_conn_io_res(lnm_loop *l, lnm_loop_conn *conn) { void lnm_loop_conn_io_res(lnm_loop *l, lnm_loop_conn *conn) {
@ -43,7 +44,9 @@ void lnm_loop_conn_io_res(lnm_loop *l, lnm_loop_conn *conn) {
ssize_t res; ssize_t res;
do { do {
res = write(conn->fd, conn->w.buf, conn->w.size); // Send with MSG_NOSIGNAL prevents closed pipes from exiting the program
// with SIGPIPE
res = send(conn->fd, conn->w.buf, conn->w.size, MSG_NOSIGNAL);
} while (res < 0 && errno == EINTR); } while (res < 0 && errno == EINTR);
// Write can't be performed without blocking; we come back later // Write can't be performed without blocking; we come back later
@ -61,17 +64,33 @@ void lnm_loop_conn_io_res(lnm_loop *l, lnm_loop_conn *conn) {
// writer function more space to work with // writer function more space to work with
memmove(conn->w.buf, &conn->w.buf[res], conn->w.size - res); memmove(conn->w.buf, &conn->w.buf[res], conn->w.size - res);
conn->w.size -= res; conn->w.size -= res;
} while (conn->state == lnm_loop_state_res); } while (conn->state == lnm_loop_state_res_io);
} }
void lnm_loop_conn_io(lnm_loop *l, lnm_loop_conn *conn) { void lnm_loop_conn_advance(lnm_loop *l, lnm_loop_conn *conn) {
do {
switch (conn->state) { switch (conn->state) {
case lnm_loop_state_req: case lnm_loop_state_req_io:
lnm_loop_conn_io_req(l, conn); lnm_loop_conn_io_req(l, conn);
break; break;
case lnm_loop_state_res: case lnm_loop_state_res_io:
lnm_loop_conn_io_res(l, conn); lnm_loop_conn_io_res(l, conn);
break; break;
case lnm_loop_state_req_work:
do {
l->data_read(conn);
} while (conn->state == lnm_loop_state_req_work);
break;
case lnm_loop_state_res_work:
do {
l->data_write(conn);
} while (conn->state == lnm_loop_state_res_work);
break;
default:; default:;
} }
} }
// Execute all blocking work if we're running in single-threaded mode
while (l->wq == NULL && (conn->state == lnm_loop_state_req_work ||
conn->state == lnm_loop_state_res_work));
}

View File

@ -0,0 +1,92 @@
#include <sys/epoll.h>
#include "lnm/log.h"
#include "lnm/loop.h"
lnm_err lnm_loop_queue_init(lnm_loop_queue **out, size_t cap) {
lnm_loop_conn **arr = calloc(cap, sizeof(lnm_loop_conn *));
if (arr == NULL) {
return lnm_err_failed_alloc;
}
lnm_loop_queue *q = calloc(1, sizeof(lnm_loop_queue));
if (q == NULL) {
free(arr);
return lnm_err_failed_alloc;
}
q->buf.arr = arr;
q->buf.len = cap;
q->tail = 0;
q->head = 0;
q->empty = true;
pthread_mutex_init(&q->mutex, NULL);
pthread_cond_init(&q->cond, NULL);
*out = q;
return lnm_err_ok;
}
void lnm_loop_queue_push(lnm_loop_queue *q, lnm_loop_conn *conn) {
pthread_mutex_lock(&q->mutex);
while (q->head == q->tail && !q->empty) {
pthread_cond_wait(&q->cond, &q->mutex);
}
q->buf.arr[q->head] = conn;
// Make sure the index wraps around
q->head = (q->head + 1) % q->buf.len;
q->empty = false;
// Unlock mutex and signal to waiting threads
pthread_mutex_unlock(&q->mutex);
pthread_cond_signal(&q->cond);
}
lnm_loop_conn *lnm_loop_queue_pop(lnm_loop_queue *q) {
pthread_mutex_lock(&q->mutex);
while (q->empty) {
pthread_cond_wait(&q->cond, &q->mutex);
}
lnm_loop_conn *out = q->buf.arr[q->tail];
q->tail = (q->tail + 1) % q->buf.len;
q->empty = q->tail == q->head;
// Unlock mutex and signal to waiting threads
pthread_mutex_unlock(&q->mutex);
pthread_cond_signal(&q->cond);
return out;
}
void lnm_loop_worker_run(void *arg) {
lnm_loop *l = arg;
lnm_loop_queue *q = l->wq;
// Get thread ID by incrementing counter
pthread_mutex_lock(&l->threads.mutex);
int thread_id = l->threads.worker_count;
l->threads.worker_count++;
pthread_mutex_unlock(&l->threads.mutex);
while (1) {
lnm_loop_conn *conn = lnm_loop_queue_pop(q);
lnm_ldebug("loop", "worker %i processing fd %i", thread_id, conn->fd);
lnm_loop_conn_advance(l, conn);
lnm_loop_conn_schedule(l, conn);
}
}