lander/lnm/src/loop/lnm_loop.c

233 lines
5.5 KiB
C

#include <fcntl.h>
#include <netinet/in.h>
#include <pthread.h>
#include <stdio.h>
#include <string.h>
#include <sys/epoll.h>
#include <unistd.h>
#include "lnm/common.h"
#include "lnm/log.h"
#include "lnm/loop.h"
#include "lnm/loop_internal.h"
static const char *section = "loop";
lnm_err lnm_loop_init(lnm_loop **out, void *gctx,
lnm_err (*ctx_init)(void **out, void *gctx),
void (*ctx_free)(void *ctx),
void (*data_read)(lnm_loop_conn *conn),
void (*data_write)(lnm_loop_conn *conn)) {
lnm_loop *l = calloc(1, sizeof(lnm_loop));
if (l == NULL) {
return lnm_err_failed_alloc;
}
l->gctx = gctx;
l->ctx_init = ctx_init;
l->ctx_free = ctx_free;
l->data_read = data_read;
l->data_write = data_write;
*out = l;
return lnm_err_ok;
}
lnm_err lnm_loop_accept(lnm_loop *l) {
int conn_fd = accept(l->listen_fd, NULL, NULL);
if (conn_fd < 0) {
lnm_lcritical(section, "accept failed: %i", conn_fd);
return lnm_err_failed_network;
}
// Set socket to non-blocking
int flags = fcntl(conn_fd, F_GETFL);
flags |= O_NONBLOCK;
fcntl(conn_fd, F_SETFL, flags);
lnm_loop_conn *conn;
LNM_RES2(lnm_loop_conn_init(&conn, l), close(conn_fd));
conn->fd = conn_fd;
conn->state = lnm_loop_state_req;
struct epoll_event event = {.data.ptr = conn,
.events = EPOLLIN | EPOLLET | EPOLLONESHOT};
epoll_ctl(l->epoll_fd, EPOLL_CTL_ADD, conn_fd, &event);
l->open++;
lnm_ldebug(section, "connection opened with fd %i", conn_fd);
return lnm_err_ok;
}
lnm_err lnm_loop_setup(lnm_loop *l, uint16_t port) {
int listen_fd = socket(AF_INET, SOCK_STREAM, 0);
if (listen_fd < 0) {
return lnm_err_failed_network;
}
int val = 1;
int res = setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(int));
if (res < 0) {
return lnm_err_failed_network;
}
struct sockaddr_in addr = {.sin_family = AF_INET,
.sin_port = ntohs(port),
.sin_addr.s_addr = ntohl(0)};
res = bind(listen_fd, (const struct sockaddr *)&addr, sizeof(addr));
if (res < 0) {
return lnm_err_failed_network;
}
res = listen(listen_fd, SOMAXCONN);
if (res < 0) {
return lnm_err_failed_network;
}
int flags = fcntl(listen_fd, F_GETFL);
fcntl(listen_fd, F_SETFL, flags | O_NONBLOCK);
int epoll_fd = epoll_create1(0);
if (epoll_fd < 0) {
return lnm_err_failed_network;
}
struct epoll_event event = {
// The listening socket is marked using a NULL data field
.data.ptr = NULL,
.events = EPOLLIN | EPOLLET | EPOLLONESHOT};
res = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &event);
if (res < 0) {
return lnm_err_failed_network;
}
l->listen_fd = listen_fd;
l->epoll_fd = epoll_fd;
return lnm_err_ok;
}
typedef struct lnm_loop_thread_args {
lnm_loop *l;
int id;
int thread_count;
} lnm_loop_thread_args;
lnm_err lnm_loop_run_thread(lnm_loop_thread_args *args) {
lnm_loop *l = args->l;
int thread_id = args->id;
int thread_count = args->thread_count;
struct epoll_event *events = calloc(1, sizeof(struct epoll_event));
int events_cap = 1;
if (events == NULL) {
return lnm_err_failed_alloc;
}
lnm_linfo(section, "thread %i started", thread_id);
struct epoll_event listen_event = {
.data.ptr = NULL, .events = EPOLLIN | EPOLLET | EPOLLONESHOT};
while (1) {
int polled = epoll_wait(l->epoll_fd, events, events_cap, -1);
lnm_ldebug(section, "polled (thread %i): %i", thread_id, polled);
if (polled < 0) {
return lnm_err_failed_poll;
}
for (int i = 0; i < polled; i++) {
if (events[i].data.ptr == NULL) {
lnm_loop_accept(l);
epoll_ctl(l->epoll_fd, EPOLL_CTL_MOD, l->listen_fd, &listen_event);
} else {
lnm_loop_conn *conn = events[i].data.ptr;
lnm_loop_conn_io(l, conn);
if (conn->state == lnm_loop_state_end) {
int conn_fd = conn->fd;
lnm_loop_conn_free(l, conn);
close(conn_fd);
l->open--;
epoll_ctl(l->epoll_fd, EPOLL_CTL_DEL, conn_fd, NULL);
lnm_ldebug(section, "connection closed with fd %i", conn_fd);
} else {
struct epoll_event event = {
.data.ptr = conn,
.events =
(conn->state == lnm_loop_state_req ? EPOLLIN : EPOLLOUT) |
EPOLLET | EPOLLONESHOT};
epoll_ctl(l->epoll_fd, EPOLL_CTL_MOD, conn->fd, &event);
}
}
}
int open = l->open;
int cap_per_thread =
open + 1 > thread_count ? (open + 1) / thread_count : 1;
if (cap_per_thread > events_cap) {
struct epoll_event *new_events =
malloc(cap_per_thread * sizeof(struct epoll_event));
if (new_events == NULL) {
return lnm_err_failed_alloc;
}
free(events);
events = new_events;
events_cap = cap_per_thread;
}
}
return lnm_err_ok;
}
lnm_err lnm_loop_run(lnm_loop *l, int thread_count) {
if (l->epoll_fd == 0) {
return lnm_err_not_setup;
}
lnm_loop_thread_args args[thread_count];
for (int i = 1; i < thread_count; i++) {
args[i].l = l;
args[i].id = i;
args[i].thread_count = thread_count;
pthread_t thread;
pthread_create(&thread, NULL, (void *(*)(void *))lnm_loop_run_thread,
&args[i]);
}
args[0].l = l;
args[0].id = 0;
args[0].thread_count = thread_count;
lnm_loop_run_thread(&args[0]);
return lnm_err_ok;
}