lnm/src/lnm/loop/lnm_loop.c

244 lines
5.9 KiB
C

#include <fcntl.h>
#include <netinet/in.h>
#include <pthread.h>
#include <stdio.h>
#include <string.h>
#include <sys/epoll.h>
#include <unistd.h>
#include "lnm/common.h"
#include "lnm/log.h"
#include "lnm/loop.h"
#include "lnm/loop_internal.h"
static const char *section = "loop";
lnm_err lnm_loop_init(lnm_loop **out, void *gctx,
lnm_err (*ctx_init)(void **out, void *gctx),
void (*ctx_free)(void *ctx),
void (*data_read)(lnm_loop_conn *conn),
void (*data_write)(lnm_loop_conn *conn)) {
lnm_loop *l = calloc(1, sizeof(lnm_loop));
if (l == NULL) {
return lnm_err_failed_alloc;
}
l->gctx = gctx;
l->ctx_init = ctx_init;
l->ctx_free = ctx_free;
l->data_read = data_read;
l->data_write = data_write;
pthread_mutex_init(&l->threads.mutex, NULL);
*out = l;
return lnm_err_ok;
}
lnm_err lnm_loop_accept(lnm_loop *l) {
int conn_fd = accept(l->listen_fd, NULL, NULL);
if (conn_fd < 0) {
lnm_lcritical(section, "accept failed: %i", conn_fd);
return lnm_err_failed_network;
}
// Set socket to non-blocking
int flags = fcntl(conn_fd, F_GETFL);
flags |= O_NONBLOCK;
fcntl(conn_fd, F_SETFL, flags);
lnm_loop_conn *conn;
LNM_RES2(lnm_loop_conn_init(&conn, l), close(conn_fd));
conn->fd = conn_fd;
conn->state = lnm_loop_state_req_io;
struct epoll_event event = {.data.ptr = conn,
.events = EPOLLIN | EPOLLET | EPOLLONESHOT};
epoll_ctl(l->epoll_fd, EPOLL_CTL_ADD, conn_fd, &event);
l->open++;
// Make sure to re-arm the listening socket after accepting
event.data.ptr = NULL;
epoll_ctl(l->epoll_fd, EPOLL_CTL_MOD, l->listen_fd, &event);
lnm_ldebug(section, "connection opened with fd %i", conn_fd);
return lnm_err_ok;
}
lnm_err lnm_loop_setup(lnm_loop *l, uint16_t port) {
int listen_fd = socket(AF_INET, SOCK_STREAM, 0);
if (listen_fd < 0) {
return lnm_err_failed_network;
}
int val = 1;
int res = setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(int));
if (res < 0) {
return lnm_err_failed_network;
}
struct sockaddr_in addr = {.sin_family = AF_INET,
.sin_port = ntohs(port),
.sin_addr.s_addr = ntohl(0)};
res = bind(listen_fd, (const struct sockaddr *)&addr, sizeof(addr));
if (res < 0) {
return lnm_err_failed_network;
}
res = listen(listen_fd, SOMAXCONN);
if (res < 0) {
return lnm_err_failed_network;
}
int flags = fcntl(listen_fd, F_GETFL);
fcntl(listen_fd, F_SETFL, flags | O_NONBLOCK);
int epoll_fd = epoll_create1(0);
if (epoll_fd < 0) {
return lnm_err_failed_network;
}
struct epoll_event event = {
// The listening socket is marked using a NULL data field
.data.ptr = NULL,
.events = EPOLLIN | EPOLLET | EPOLLONESHOT};
res = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &event);
if (res < 0) {
return lnm_err_failed_network;
}
l->listen_fd = listen_fd;
l->epoll_fd = epoll_fd;
return lnm_err_ok;
}
void lnm_loop_conn_schedule(lnm_loop *l, lnm_loop_conn *conn) {
switch (conn->state) {
// IO states get rescheduled in the epoll loop
case lnm_loop_state_req_io:
case lnm_loop_state_res_io: {
struct epoll_event event = {
.data.ptr = conn,
.events = (conn->state == lnm_loop_state_req_io ? EPOLLIN : EPOLLOUT) |
EPOLLET | EPOLLONESHOT};
epoll_ctl(l->epoll_fd, EPOLL_CTL_MOD, conn->fd, &event);
} break;
case lnm_loop_state_req_work:
case lnm_loop_state_res_work:
lnm_loop_queue_push(l->wq, conn);
break;
case lnm_loop_state_end: {
int conn_fd = conn->fd;
lnm_loop_conn_free(l, conn);
close(conn_fd);
l->open--;
epoll_ctl(l->epoll_fd, EPOLL_CTL_DEL, conn_fd, NULL);
lnm_ldebug(section, "connection closed with fd %i", conn_fd);
} break;
}
}
lnm_err lnm_loop_run(lnm_loop *l) {
if (l->epoll_fd == 0) {
return lnm_err_not_setup;
}
// Get thread ID by incrementing counter
pthread_mutex_lock(&l->threads.mutex);
int thread_id = l->threads.epoll_count;
l->threads.epoll_count++;
pthread_mutex_unlock(&l->threads.mutex);
struct epoll_event *events = calloc(1, sizeof(struct epoll_event));
size_t events_cap = 1;
if (events == NULL) {
return lnm_err_failed_alloc;
}
lnm_linfo(section, "thread %i started", thread_id);
while (1) {
int polled = epoll_wait(l->epoll_fd, events, events_cap, -1);
lnm_ldebug(section, "polled (thread %i): %i", thread_id, polled);
if (polled < 0) {
return lnm_err_failed_poll;
}
for (int i = 0; i < polled; i++) {
if (events[i].data.ptr == NULL) {
lnm_loop_accept(l);
} else {
lnm_loop_conn *conn = events[i].data.ptr;
// At this point, state is always an IO state
lnm_loop_conn_advance(l, conn);
lnm_loop_conn_schedule(l, conn);
}
}
size_t open = l->open;
size_t cap_per_thread = open + 1 > l->threads.epoll_count
? (open + 1) / l->threads.epoll_count
: 1;
if (cap_per_thread > events_cap) {
struct epoll_event *new_events =
malloc(cap_per_thread * sizeof(struct epoll_event));
if (new_events == NULL) {
return lnm_err_failed_alloc;
}
free(events);
events = new_events;
events_cap = cap_per_thread;
}
}
return lnm_err_ok;
}
lnm_err lnm_loop_run_multi(lnm_loop *l, size_t epoll_threads,
size_t worker_threads) {
if (worker_threads > 0) {
LNM_RES(lnm_loop_queue_init(&l->wq, LNM_QUEUE_MULTIPLIER * worker_threads));
}
pthread_t t;
for (size_t i = 1; i < epoll_threads; i++) {
pthread_create(&t, NULL, (void *(*)(void *))lnm_loop_run, l);
}
for (size_t i = 0; i < worker_threads; i++) {
pthread_create(&t, NULL, (void *(*)(void *))lnm_loop_worker_run, l);
}
return lnm_loop_run(l);
}