Compare commits
No commits in common. "ccb9b77cc854540ef3279f0c3af9fb704e3c7efa" and "854e8c3809bf97a3c395525d15b8a3fe00e89005" have entirely different histories.
ccb9b77cc8
...
854e8c3809
|
|
@ -1,7 +1,6 @@
|
||||||
#ifndef LNM_LOOP
|
#ifndef LNM_LOOP
|
||||||
#define LNM_LOOP
|
#define LNM_LOOP
|
||||||
|
|
||||||
#include <pthread.h>
|
|
||||||
#include <stdatomic.h>
|
#include <stdatomic.h>
|
||||||
#include <stdint.h>
|
#include <stdint.h>
|
||||||
#include <stdlib.h>
|
#include <stdlib.h>
|
||||||
|
|
@ -10,17 +9,12 @@
|
||||||
|
|
||||||
#define LNM_LOOP_BUF_SIZE 2048
|
#define LNM_LOOP_BUF_SIZE 2048
|
||||||
|
|
||||||
typedef enum lnm_loop_state {
|
typedef enum {
|
||||||
lnm_loop_state_req = 0,
|
lnm_loop_state_req = 0,
|
||||||
lnm_loop_state_res,
|
lnm_loop_state_res,
|
||||||
lnm_loop_state_end,
|
lnm_loop_state_end,
|
||||||
lnm_loop_state_req_blocking,
|
|
||||||
lnm_loop_state_res_blocking,
|
|
||||||
} lnm_loop_state;
|
} lnm_loop_state;
|
||||||
|
|
||||||
/**
|
|
||||||
* State for a currently active connection
|
|
||||||
*/
|
|
||||||
typedef struct lnm_loop_conn {
|
typedef struct lnm_loop_conn {
|
||||||
int fd;
|
int fd;
|
||||||
lnm_loop_state state;
|
lnm_loop_state state;
|
||||||
|
|
@ -36,37 +30,6 @@ typedef struct lnm_loop_conn {
|
||||||
} w;
|
} w;
|
||||||
} lnm_loop_conn;
|
} lnm_loop_conn;
|
||||||
|
|
||||||
/**
|
|
||||||
* Concurrent fixed-size queue used to distribute work among worker threads
|
|
||||||
*/
|
|
||||||
typedef struct lnm_loop_queue {
|
|
||||||
struct {
|
|
||||||
lnm_loop_conn **arr;
|
|
||||||
size_t len;
|
|
||||||
} buf;
|
|
||||||
size_t head;
|
|
||||||
size_t tail;
|
|
||||||
pthread_mutex_t mutex;
|
|
||||||
pthread_cond_t cond;
|
|
||||||
} lnm_loop_queue;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Initialize a new queue with the specified fixed capacity
|
|
||||||
*/
|
|
||||||
lnm_err lnm_loop_queue_init(lnm_loop_queue **out, size_t cap);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Queue the given connection. If the queue is currently full, this action
|
|
||||||
* blocks until there is space available.
|
|
||||||
*/
|
|
||||||
void lnm_loop_queue_push(lnm_loop_queue *q, lnm_loop_conn *conn);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Pop a connection from the queue. This action blocks until a connection is
|
|
||||||
* available.
|
|
||||||
*/
|
|
||||||
lnm_loop_conn *lnm_loop_queue_pop(lnm_loop_queue *q);
|
|
||||||
|
|
||||||
typedef struct lnm_loop {
|
typedef struct lnm_loop {
|
||||||
int listen_fd;
|
int listen_fd;
|
||||||
int epoll_fd;
|
int epoll_fd;
|
||||||
|
|
@ -76,7 +39,6 @@ typedef struct lnm_loop {
|
||||||
void (*ctx_free)(void *ctx);
|
void (*ctx_free)(void *ctx);
|
||||||
void (*data_read)(lnm_loop_conn *conn);
|
void (*data_read)(lnm_loop_conn *conn);
|
||||||
void (*data_write)(lnm_loop_conn *conn);
|
void (*data_write)(lnm_loop_conn *conn);
|
||||||
lnm_loop_queue *wq;
|
|
||||||
} lnm_loop;
|
} lnm_loop;
|
||||||
|
|
||||||
lnm_err lnm_loop_init(lnm_loop **out, void *gctx,
|
lnm_err lnm_loop_init(lnm_loop **out, void *gctx,
|
||||||
|
|
@ -89,15 +51,4 @@ lnm_err lnm_loop_setup(lnm_loop *l, uint16_t port);
|
||||||
|
|
||||||
lnm_err lnm_loop_run(lnm_loop *l, int thread_count);
|
lnm_err lnm_loop_run(lnm_loop *l, int thread_count);
|
||||||
|
|
||||||
/**
|
|
||||||
* Reschedule the given connection, either on the event loop for network IO or
|
|
||||||
* on a worker thread for blocking work. Connections are terminated as needed.
|
|
||||||
*/
|
|
||||||
void lnm_loop_conn_schedule(lnm_loop *l, lnm_loop_conn *conn);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Main loop executed on the worker threads.
|
|
||||||
*/
|
|
||||||
void lnm_loop_worker_run(void *arg);
|
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
|
||||||
|
|
@ -62,10 +62,6 @@ lnm_err lnm_loop_accept(lnm_loop *l) {
|
||||||
|
|
||||||
l->open++;
|
l->open++;
|
||||||
|
|
||||||
// Make sure to re-arm the listening socket after accepting
|
|
||||||
event.data.ptr = NULL;
|
|
||||||
epoll_ctl(l->epoll_fd, EPOLL_CTL_MOD, l->listen_fd, &event);
|
|
||||||
|
|
||||||
lnm_ldebug(section, "connection opened with fd %i", conn_fd);
|
lnm_ldebug(section, "connection opened with fd %i", conn_fd);
|
||||||
|
|
||||||
return lnm_err_ok;
|
return lnm_err_ok;
|
||||||
|
|
@ -127,36 +123,6 @@ lnm_err lnm_loop_setup(lnm_loop *l, uint16_t port) {
|
||||||
return lnm_err_ok;
|
return lnm_err_ok;
|
||||||
}
|
}
|
||||||
|
|
||||||
void lnm_loop_conn_schedule(lnm_loop *l, lnm_loop_conn *conn) {
|
|
||||||
switch (conn->state) {
|
|
||||||
// IO states get rescheduled in the epoll loop
|
|
||||||
case lnm_loop_state_req:
|
|
||||||
case lnm_loop_state_res: {
|
|
||||||
struct epoll_event event = {
|
|
||||||
.data.ptr = conn,
|
|
||||||
.events = (conn->state == lnm_loop_state_req ? EPOLLIN : EPOLLOUT) |
|
|
||||||
EPOLLET | EPOLLONESHOT};
|
|
||||||
|
|
||||||
epoll_ctl(l->epoll_fd, EPOLL_CTL_MOD, conn->fd, &event);
|
|
||||||
} break;
|
|
||||||
case lnm_loop_state_req_blocking:
|
|
||||||
case lnm_loop_state_res_blocking:
|
|
||||||
lnm_loop_queue_push(l->wq, conn);
|
|
||||||
break;
|
|
||||||
case lnm_loop_state_end: {
|
|
||||||
int conn_fd = conn->fd;
|
|
||||||
|
|
||||||
lnm_loop_conn_free(l, conn);
|
|
||||||
close(conn_fd);
|
|
||||||
l->open--;
|
|
||||||
|
|
||||||
epoll_ctl(l->epoll_fd, EPOLL_CTL_DEL, conn_fd, NULL);
|
|
||||||
|
|
||||||
lnm_ldebug(section, "connection closed with fd %i", conn_fd);
|
|
||||||
} break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
typedef struct lnm_loop_thread_args {
|
typedef struct lnm_loop_thread_args {
|
||||||
lnm_loop *l;
|
lnm_loop *l;
|
||||||
int id;
|
int id;
|
||||||
|
|
@ -177,6 +143,9 @@ lnm_err lnm_loop_run_thread(lnm_loop_thread_args *args) {
|
||||||
|
|
||||||
lnm_linfo(section, "thread %i started", thread_id);
|
lnm_linfo(section, "thread %i started", thread_id);
|
||||||
|
|
||||||
|
struct epoll_event listen_event = {
|
||||||
|
.data.ptr = NULL, .events = EPOLLIN | EPOLLET | EPOLLONESHOT};
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
int polled = epoll_wait(l->epoll_fd, events, events_cap, -1);
|
int polled = epoll_wait(l->epoll_fd, events, events_cap, -1);
|
||||||
lnm_ldebug(section, "polled (thread %i): %i", thread_id, polled);
|
lnm_ldebug(section, "polled (thread %i): %i", thread_id, polled);
|
||||||
|
|
@ -188,12 +157,30 @@ lnm_err lnm_loop_run_thread(lnm_loop_thread_args *args) {
|
||||||
for (int i = 0; i < polled; i++) {
|
for (int i = 0; i < polled; i++) {
|
||||||
if (events[i].data.ptr == NULL) {
|
if (events[i].data.ptr == NULL) {
|
||||||
lnm_loop_accept(l);
|
lnm_loop_accept(l);
|
||||||
|
|
||||||
|
epoll_ctl(l->epoll_fd, EPOLL_CTL_MOD, l->listen_fd, &listen_event);
|
||||||
} else {
|
} else {
|
||||||
lnm_loop_conn *conn = events[i].data.ptr;
|
lnm_loop_conn *conn = events[i].data.ptr;
|
||||||
|
|
||||||
// At this point, state is always an IO state
|
|
||||||
lnm_loop_conn_io(l, conn);
|
lnm_loop_conn_io(l, conn);
|
||||||
lnm_loop_conn_schedule(l, conn);
|
|
||||||
|
if (conn->state == lnm_loop_state_end) {
|
||||||
|
int conn_fd = conn->fd;
|
||||||
|
|
||||||
|
lnm_loop_conn_free(l, conn);
|
||||||
|
close(conn_fd);
|
||||||
|
l->open--;
|
||||||
|
|
||||||
|
epoll_ctl(l->epoll_fd, EPOLL_CTL_DEL, conn_fd, NULL);
|
||||||
|
|
||||||
|
lnm_ldebug(section, "connection closed with fd %i", conn_fd);
|
||||||
|
} else {
|
||||||
|
struct epoll_event event = {
|
||||||
|
.data.ptr = conn,
|
||||||
|
.events =
|
||||||
|
(conn->state == lnm_loop_state_req ? EPOLLIN : EPOLLOUT) |
|
||||||
|
EPOLLET | EPOLLONESHOT};
|
||||||
|
epoll_ctl(l->epoll_fd, EPOLL_CTL_MOD, conn->fd, &event);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,84 +0,0 @@
|
||||||
#include <sys/epoll.h>
|
|
||||||
|
|
||||||
#include "lnm/loop.h"
|
|
||||||
|
|
||||||
lnm_err lnm_loop_queue_init(lnm_loop_queue **out, size_t cap) {
|
|
||||||
lnm_loop_conn **arr = malloc(cap * sizeof(lnm_loop_conn *));
|
|
||||||
|
|
||||||
if (arr == NULL) {
|
|
||||||
return lnm_err_failed_alloc;
|
|
||||||
}
|
|
||||||
|
|
||||||
lnm_loop_queue *q = calloc(1, sizeof(lnm_loop_queue));
|
|
||||||
|
|
||||||
if (q == NULL) {
|
|
||||||
free(arr);
|
|
||||||
|
|
||||||
return lnm_err_failed_alloc;
|
|
||||||
}
|
|
||||||
|
|
||||||
q->buf.arr = arr;
|
|
||||||
q->buf.len = cap;
|
|
||||||
|
|
||||||
pthread_mutex_init(&q->mutex, NULL);
|
|
||||||
pthread_cond_init(&q->cond, NULL);
|
|
||||||
|
|
||||||
*out = q;
|
|
||||||
|
|
||||||
return lnm_err_ok;
|
|
||||||
}
|
|
||||||
|
|
||||||
void lnm_loop_queue_push(lnm_loop_queue *q, lnm_loop_conn *conn) {
|
|
||||||
pthread_mutex_lock(&q->mutex);
|
|
||||||
|
|
||||||
while (q->tail == q->head) {
|
|
||||||
pthread_cond_wait(&q->cond, &q->mutex);
|
|
||||||
}
|
|
||||||
|
|
||||||
q->buf.arr[q->tail] = conn;
|
|
||||||
|
|
||||||
// Make sure the index wraps around
|
|
||||||
q->tail = (q->tail + 1) % q->buf.len;
|
|
||||||
|
|
||||||
// Unlock mutex and signal to waiting threads
|
|
||||||
pthread_mutex_unlock(&q->mutex);
|
|
||||||
pthread_cond_signal(&q->cond);
|
|
||||||
}
|
|
||||||
|
|
||||||
lnm_loop_conn *lnm_loop_queue_pop(lnm_loop_queue *q) {
|
|
||||||
pthread_mutex_lock(&q->mutex);
|
|
||||||
|
|
||||||
while (q->tail == q->head) {
|
|
||||||
pthread_cond_wait(&q->cond, &q->mutex);
|
|
||||||
}
|
|
||||||
|
|
||||||
lnm_loop_conn *out = q->buf.arr[q->head];
|
|
||||||
q->head = (q->head + 1) % q->buf.len;
|
|
||||||
|
|
||||||
// Unlock mutex and signal to waiting threads
|
|
||||||
pthread_mutex_unlock(&q->mutex);
|
|
||||||
pthread_cond_signal(&q->cond);
|
|
||||||
|
|
||||||
return out;
|
|
||||||
}
|
|
||||||
|
|
||||||
void lnm_loop_worker_run(void *arg) {
|
|
||||||
lnm_loop *l = arg;
|
|
||||||
lnm_loop_queue *q = l->wq;
|
|
||||||
|
|
||||||
while (1) {
|
|
||||||
lnm_loop_conn *conn = lnm_loop_queue_pop(q);
|
|
||||||
|
|
||||||
switch (conn->state) {
|
|
||||||
case lnm_loop_state_req_blocking:
|
|
||||||
l->data_read(conn);
|
|
||||||
break;
|
|
||||||
case lnm_loop_state_res_blocking:
|
|
||||||
l->data_write(conn);
|
|
||||||
// Other states shouldn't even end up here, so we ignore them
|
|
||||||
default:;
|
|
||||||
}
|
|
||||||
|
|
||||||
lnm_loop_conn_schedule(l, conn);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Loading…
Reference in New Issue