Compare commits
3 Commits
ccb9b77cc8
...
64601b5f21
Author | SHA1 | Date |
---|---|---|
Jef Roosens | 64601b5f21 | |
Jef Roosens | a96ae39523 | |
Jef Roosens | 9c01548256 |
|
@ -34,5 +34,5 @@ int main() {
|
|||
lnm_log_init_global();
|
||||
lnm_log_register_stdout(lnm_log_level_debug);
|
||||
|
||||
lnm_http_loop_run(hl, 8080, 1);
|
||||
printf("res = %i\n", lnm_http_loop_run(hl, 8080, 1, 32));
|
||||
}
|
||||
|
|
|
@ -81,7 +81,8 @@ lnm_err lnm_http_route_init_regex(lnm_http_route **out, lnm_http_method method,
|
|||
*/
|
||||
lnm_err lnm_http_loop_route_add(lnm_http_loop *hl, lnm_http_route *route);
|
||||
|
||||
lnm_err lnm_http_loop_run(lnm_http_loop *hl, uint16_t port, int thread_count);
|
||||
lnm_err lnm_http_loop_run(lnm_http_loop *hl, uint16_t port,
|
||||
size_t epoll_threads, size_t worker_threads);
|
||||
|
||||
void lnm_http_loop_set_api_key(lnm_http_loop *hl, const char *api_key);
|
||||
|
||||
|
|
|
@ -9,6 +9,7 @@
|
|||
#include "lnm/common.h"
|
||||
|
||||
#define LNM_LOOP_BUF_SIZE 2048
|
||||
#define LNM_QUEUE_MULTIPLIER 8
|
||||
|
||||
typedef enum lnm_loop_state {
|
||||
lnm_loop_state_req = 0,
|
||||
|
@ -46,6 +47,7 @@ typedef struct lnm_loop_queue {
|
|||
} buf;
|
||||
size_t head;
|
||||
size_t tail;
|
||||
bool empty;
|
||||
pthread_mutex_t mutex;
|
||||
pthread_cond_t cond;
|
||||
} lnm_loop_queue;
|
||||
|
@ -77,6 +79,12 @@ typedef struct lnm_loop {
|
|||
void (*data_read)(lnm_loop_conn *conn);
|
||||
void (*data_write)(lnm_loop_conn *conn);
|
||||
lnm_loop_queue *wq;
|
||||
struct {
|
||||
// Mutex shared between all threads; used for counting thread IDs
|
||||
pthread_mutex_t mutex;
|
||||
size_t worker_count;
|
||||
size_t epoll_count;
|
||||
} threads;
|
||||
} lnm_loop;
|
||||
|
||||
lnm_err lnm_loop_init(lnm_loop **out, void *gctx,
|
||||
|
@ -87,7 +95,16 @@ lnm_err lnm_loop_init(lnm_loop **out, void *gctx,
|
|||
|
||||
lnm_err lnm_loop_setup(lnm_loop *l, uint16_t port);
|
||||
|
||||
lnm_err lnm_loop_run(lnm_loop *l, int thread_count);
|
||||
/**
|
||||
* Run a single epoll thread of the event loop.
|
||||
*/
|
||||
lnm_err lnm_loop_run(lnm_loop *l);
|
||||
|
||||
/**
|
||||
* Run a multithreaded event loop with the configured number of threads.
|
||||
*/
|
||||
lnm_err lnm_loop_run_multi(lnm_loop *l, size_t epoll_threads,
|
||||
size_t worker_threads);
|
||||
|
||||
/**
|
||||
* Reschedule the given connection, either on the event loop for network IO or
|
||||
|
|
|
@ -8,6 +8,7 @@
|
|||
typedef struct lnm_http_step {
|
||||
lnm_http_step_fn fn;
|
||||
struct lnm_http_step *next;
|
||||
bool blocking;
|
||||
} lnm_http_step;
|
||||
|
||||
typedef enum lnm_http_route_type {
|
||||
|
|
|
@ -122,9 +122,10 @@ lnm_err lnm_http_loop_route_add(lnm_http_loop *hl, lnm_http_route *route) {
|
|||
return lnm_err_ok;
|
||||
}
|
||||
|
||||
lnm_err lnm_http_loop_run(lnm_http_loop *hl, uint16_t port, int thread_count) {
|
||||
lnm_err lnm_http_loop_run(lnm_http_loop *hl, uint16_t port,
|
||||
size_t epoll_threads, size_t worker_threads) {
|
||||
LNM_RES(lnm_loop_setup(hl, port));
|
||||
return lnm_loop_run(hl, thread_count);
|
||||
return lnm_loop_run_multi(hl, epoll_threads, worker_threads);
|
||||
}
|
||||
|
||||
void lnm_http_loop_set_api_key(lnm_http_loop *hl, const char *api_key) {
|
||||
|
|
|
@ -133,11 +133,19 @@ void lnm_http_loop_process_steps(lnm_http_conn *conn) {
|
|||
while ((ctx->cur_step != NULL) && (step != ctx->cur_step)) {
|
||||
step = ctx->cur_step;
|
||||
|
||||
if (step->blocking && (conn->state != lnm_loop_state_req_blocking)) {
|
||||
conn->state = lnm_loop_state_req_blocking;
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
switch (step->fn(conn)) {
|
||||
case lnm_http_step_err_done:
|
||||
ctx->cur_step = ctx->cur_step->next;
|
||||
break;
|
||||
case lnm_http_step_err_io_needed:
|
||||
// Ensure steps that require more I/O are executed on the event loop
|
||||
conn->state = lnm_loop_state_req;
|
||||
break;
|
||||
case lnm_http_step_err_close:
|
||||
conn->state = lnm_loop_state_end;
|
||||
|
@ -149,6 +157,7 @@ void lnm_http_loop_process_steps(lnm_http_conn *conn) {
|
|||
}
|
||||
|
||||
if (ctx->cur_step == NULL) {
|
||||
conn->state = lnm_loop_state_res;
|
||||
ctx->state = lnm_http_loop_state_add_headers;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,6 +30,8 @@ lnm_err lnm_loop_init(lnm_loop **out, void *gctx,
|
|||
l->data_read = data_read;
|
||||
l->data_write = data_write;
|
||||
|
||||
pthread_mutex_init(&l->threads.mutex, NULL);
|
||||
|
||||
*out = l;
|
||||
|
||||
return lnm_err_ok;
|
||||
|
@ -157,19 +159,21 @@ void lnm_loop_conn_schedule(lnm_loop *l, lnm_loop_conn *conn) {
|
|||
}
|
||||
}
|
||||
|
||||
typedef struct lnm_loop_thread_args {
|
||||
lnm_loop *l;
|
||||
int id;
|
||||
int thread_count;
|
||||
} lnm_loop_thread_args;
|
||||
lnm_err lnm_loop_run(lnm_loop *l) {
|
||||
if (l->epoll_fd == 0) {
|
||||
return lnm_err_not_setup;
|
||||
}
|
||||
|
||||
lnm_err lnm_loop_run_thread(lnm_loop_thread_args *args) {
|
||||
lnm_loop *l = args->l;
|
||||
int thread_id = args->id;
|
||||
int thread_count = args->thread_count;
|
||||
// Get thread ID by incrementing counter
|
||||
pthread_mutex_lock(&l->threads.mutex);
|
||||
|
||||
int thread_id = l->threads.epoll_count;
|
||||
l->threads.epoll_count++;
|
||||
|
||||
pthread_mutex_unlock(&l->threads.mutex);
|
||||
|
||||
struct epoll_event *events = calloc(1, sizeof(struct epoll_event));
|
||||
int events_cap = 1;
|
||||
size_t events_cap = 1;
|
||||
|
||||
if (events == NULL) {
|
||||
return lnm_err_failed_alloc;
|
||||
|
@ -197,9 +201,10 @@ lnm_err lnm_loop_run_thread(lnm_loop_thread_args *args) {
|
|||
}
|
||||
}
|
||||
|
||||
int open = l->open;
|
||||
int cap_per_thread =
|
||||
open + 1 > thread_count ? (open + 1) / thread_count : 1;
|
||||
size_t open = l->open;
|
||||
size_t cap_per_thread = open + 1 > l->threads.epoll_count
|
||||
? (open + 1) / l->threads.epoll_count
|
||||
: 1;
|
||||
|
||||
if (cap_per_thread > events_cap) {
|
||||
struct epoll_event *new_events =
|
||||
|
@ -218,28 +223,21 @@ lnm_err lnm_loop_run_thread(lnm_loop_thread_args *args) {
|
|||
return lnm_err_ok;
|
||||
}
|
||||
|
||||
lnm_err lnm_loop_run(lnm_loop *l, int thread_count) {
|
||||
if (l->epoll_fd == 0) {
|
||||
return lnm_err_not_setup;
|
||||
lnm_err lnm_loop_run_multi(lnm_loop *l, size_t epoll_threads,
|
||||
size_t worker_threads) {
|
||||
if (worker_threads > 0) {
|
||||
LNM_RES(lnm_loop_queue_init(&l->wq, LNM_QUEUE_MULTIPLIER * worker_threads));
|
||||
}
|
||||
|
||||
lnm_loop_thread_args args[thread_count];
|
||||
pthread_t t;
|
||||
|
||||
for (int i = 1; i < thread_count; i++) {
|
||||
args[i].l = l;
|
||||
args[i].id = i;
|
||||
args[i].thread_count = thread_count;
|
||||
|
||||
pthread_t thread;
|
||||
pthread_create(&thread, NULL, (void *(*)(void *))lnm_loop_run_thread,
|
||||
&args[i]);
|
||||
for (size_t i = 1; i < epoll_threads; i++) {
|
||||
pthread_create(&t, NULL, (void *(*)(void *))lnm_loop_run, l);
|
||||
}
|
||||
|
||||
args[0].l = l;
|
||||
args[0].id = 0;
|
||||
args[0].thread_count = thread_count;
|
||||
|
||||
lnm_loop_run_thread(&args[0]);
|
||||
|
||||
return lnm_err_ok;
|
||||
for (size_t i = 0; i < worker_threads; i++) {
|
||||
pthread_create(&t, NULL, (void *(*)(void *))lnm_loop_worker_run, l);
|
||||
}
|
||||
|
||||
return lnm_loop_run(l);
|
||||
}
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
#include <errno.h>
|
||||
#include <string.h>
|
||||
#include <unistd.h>
|
||||
#include <sys/socket.h>
|
||||
|
||||
#include "lnm/loop.h"
|
||||
#include "lnm/loop_internal.h"
|
||||
|
@ -43,7 +44,9 @@ void lnm_loop_conn_io_res(lnm_loop *l, lnm_loop_conn *conn) {
|
|||
ssize_t res;
|
||||
|
||||
do {
|
||||
res = write(conn->fd, conn->w.buf, conn->w.size);
|
||||
// Send with MSG_NOSIGNAL prevents closed pipes from exiting the program
|
||||
// with SIGPIPE
|
||||
res = send(conn->fd, conn->w.buf, conn->w.size, MSG_NOSIGNAL);
|
||||
} while (res < 0 && errno == EINTR);
|
||||
|
||||
// Write can't be performed without blocking; we come back later
|
||||
|
|
|
@ -1,9 +1,10 @@
|
|||
#include <sys/epoll.h>
|
||||
|
||||
#include "lnm/log.h"
|
||||
#include "lnm/loop.h"
|
||||
|
||||
lnm_err lnm_loop_queue_init(lnm_loop_queue **out, size_t cap) {
|
||||
lnm_loop_conn **arr = malloc(cap * sizeof(lnm_loop_conn *));
|
||||
lnm_loop_conn **arr = calloc(cap, sizeof(lnm_loop_conn *));
|
||||
|
||||
if (arr == NULL) {
|
||||
return lnm_err_failed_alloc;
|
||||
|
@ -20,6 +21,10 @@ lnm_err lnm_loop_queue_init(lnm_loop_queue **out, size_t cap) {
|
|||
q->buf.arr = arr;
|
||||
q->buf.len = cap;
|
||||
|
||||
q->tail = 0;
|
||||
q->head = 0;
|
||||
q->empty = true;
|
||||
|
||||
pthread_mutex_init(&q->mutex, NULL);
|
||||
pthread_cond_init(&q->cond, NULL);
|
||||
|
||||
|
@ -31,14 +36,15 @@ lnm_err lnm_loop_queue_init(lnm_loop_queue **out, size_t cap) {
|
|||
void lnm_loop_queue_push(lnm_loop_queue *q, lnm_loop_conn *conn) {
|
||||
pthread_mutex_lock(&q->mutex);
|
||||
|
||||
while (q->tail == q->head) {
|
||||
while (q->head == q->tail && !q->empty) {
|
||||
pthread_cond_wait(&q->cond, &q->mutex);
|
||||
}
|
||||
|
||||
q->buf.arr[q->tail] = conn;
|
||||
q->buf.arr[q->head] = conn;
|
||||
|
||||
// Make sure the index wraps around
|
||||
q->tail = (q->tail + 1) % q->buf.len;
|
||||
q->head = (q->head + 1) % q->buf.len;
|
||||
q->empty = false;
|
||||
|
||||
// Unlock mutex and signal to waiting threads
|
||||
pthread_mutex_unlock(&q->mutex);
|
||||
|
@ -48,12 +54,14 @@ void lnm_loop_queue_push(lnm_loop_queue *q, lnm_loop_conn *conn) {
|
|||
lnm_loop_conn *lnm_loop_queue_pop(lnm_loop_queue *q) {
|
||||
pthread_mutex_lock(&q->mutex);
|
||||
|
||||
while (q->tail == q->head) {
|
||||
while (q->empty) {
|
||||
pthread_cond_wait(&q->cond, &q->mutex);
|
||||
}
|
||||
|
||||
lnm_loop_conn *out = q->buf.arr[q->head];
|
||||
q->head = (q->head + 1) % q->buf.len;
|
||||
lnm_loop_conn *out = q->buf.arr[q->tail];
|
||||
|
||||
q->tail = (q->tail + 1) % q->buf.len;
|
||||
q->empty = q->tail == q->head;
|
||||
|
||||
// Unlock mutex and signal to waiting threads
|
||||
pthread_mutex_unlock(&q->mutex);
|
||||
|
@ -66,8 +74,17 @@ void lnm_loop_worker_run(void *arg) {
|
|||
lnm_loop *l = arg;
|
||||
lnm_loop_queue *q = l->wq;
|
||||
|
||||
// Get thread ID by incrementing counter
|
||||
pthread_mutex_lock(&l->threads.mutex);
|
||||
|
||||
int thread_id = l->threads.worker_count;
|
||||
l->threads.worker_count++;
|
||||
|
||||
pthread_mutex_unlock(&l->threads.mutex);
|
||||
|
||||
while (1) {
|
||||
lnm_loop_conn *conn = lnm_loop_queue_pop(q);
|
||||
lnm_ldebug("loop", "worker %i processing fd %i", thread_id, conn->fd);
|
||||
|
||||
switch (conn->state) {
|
||||
case lnm_loop_state_req_blocking:
|
||||
|
|
Loading…
Reference in New Issue