feat(lnm): switch to epoll
parent
89cc41f28a
commit
d53a949946
|
@ -18,6 +18,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||||
* Better API for adding routes
|
* Better API for adding routes
|
||||||
* State machine HTTP loop
|
* State machine HTTP loop
|
||||||
* Automatically support HEAD requests for all GET requests
|
* Automatically support HEAD requests for all GET requests
|
||||||
|
* Event loop uses `epoll` instead of `poll`
|
||||||
|
* Configurable multithreading using `epoll`
|
||||||
* Landerctl
|
* Landerctl
|
||||||
* `-c` flag to use custom config file (useful for testing)
|
* `-c` flag to use custom config file (useful for testing)
|
||||||
|
|
||||||
|
|
|
@ -81,7 +81,7 @@ lnm_err lnm_http_route_init_regex(lnm_http_route **out, lnm_http_method method,
|
||||||
*/
|
*/
|
||||||
lnm_err lnm_http_loop_route_add(lnm_http_loop *hl, lnm_http_route *route);
|
lnm_err lnm_http_loop_route_add(lnm_http_loop *hl, lnm_http_route *route);
|
||||||
|
|
||||||
lnm_err lnm_http_loop_run(lnm_http_loop *hl, uint16_t port);
|
lnm_err lnm_http_loop_run(lnm_http_loop *hl, uint16_t port, int thread_count);
|
||||||
|
|
||||||
void lnm_http_loop_set_api_key(lnm_http_loop *hl, const char *api_key);
|
void lnm_http_loop_set_api_key(lnm_http_loop *hl, const char *api_key);
|
||||||
|
|
||||||
|
|
|
@ -1,13 +1,13 @@
|
||||||
#ifndef LNM_LOOP
|
#ifndef LNM_LOOP
|
||||||
#define LNM_LOOP
|
#define LNM_LOOP
|
||||||
|
|
||||||
|
#include <stdatomic.h>
|
||||||
#include <stdint.h>
|
#include <stdint.h>
|
||||||
#include <stdlib.h>
|
#include <stdlib.h>
|
||||||
|
|
||||||
#include "lnm/common.h"
|
#include "lnm/common.h"
|
||||||
|
|
||||||
#define LNM_LOOP_BUF_SIZE 2048
|
#define LNM_LOOP_BUF_SIZE 2048
|
||||||
#define LNM_LOOP_INITIAL_CONNS 16
|
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
lnm_loop_state_req = 0,
|
lnm_loop_state_req = 0,
|
||||||
|
@ -32,11 +32,8 @@ typedef struct lnm_loop_conn {
|
||||||
|
|
||||||
typedef struct lnm_loop {
|
typedef struct lnm_loop {
|
||||||
int listen_fd;
|
int listen_fd;
|
||||||
struct {
|
int epoll_fd;
|
||||||
lnm_loop_conn **arr;
|
atomic_int open;
|
||||||
size_t len;
|
|
||||||
size_t open;
|
|
||||||
} conns;
|
|
||||||
void *gctx;
|
void *gctx;
|
||||||
lnm_err (*ctx_init)(void **out, void *gctx);
|
lnm_err (*ctx_init)(void **out, void *gctx);
|
||||||
void (*ctx_free)(void *ctx);
|
void (*ctx_free)(void *ctx);
|
||||||
|
@ -52,6 +49,6 @@ lnm_err lnm_loop_init(lnm_loop **out, void *gctx,
|
||||||
|
|
||||||
lnm_err lnm_loop_setup(lnm_loop *l, uint16_t port);
|
lnm_err lnm_loop_setup(lnm_loop *l, uint16_t port);
|
||||||
|
|
||||||
lnm_err lnm_loop_run(lnm_loop *l);
|
lnm_err lnm_loop_run(lnm_loop *l, int thread_count);
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -122,9 +122,9 @@ lnm_err lnm_http_loop_route_add(lnm_http_loop *hl, lnm_http_route *route) {
|
||||||
return lnm_err_ok;
|
return lnm_err_ok;
|
||||||
}
|
}
|
||||||
|
|
||||||
lnm_err lnm_http_loop_run(lnm_http_loop *hl, uint16_t port) {
|
lnm_err lnm_http_loop_run(lnm_http_loop *hl, uint16_t port, int thread_count) {
|
||||||
LNM_RES(lnm_loop_setup(hl, port));
|
LNM_RES(lnm_loop_setup(hl, port));
|
||||||
return lnm_loop_run(hl);
|
return lnm_loop_run(hl, thread_count);
|
||||||
}
|
}
|
||||||
|
|
||||||
void lnm_http_loop_set_api_key(lnm_http_loop *hl, const char *api_key) {
|
void lnm_http_loop_set_api_key(lnm_http_loop *hl, const char *api_key) {
|
||||||
|
|
|
@ -29,8 +29,8 @@ void lnm_http_loop_process_parse_req(lnm_http_conn *conn) {
|
||||||
ctx->state = lnm_http_loop_state_route;
|
ctx->state = lnm_http_loop_state_route;
|
||||||
|
|
||||||
lnm_linfo(section, "%s %.*s HTTP/1.%i",
|
lnm_linfo(section, "%s %.*s HTTP/1.%i",
|
||||||
lnm_http_method_names[ctx->req.method], (int)ctx->req.path.len,
|
lnm_http_method_names[ctx->req.method], (int)ctx->req.path.len,
|
||||||
ctx->req.path.s, ctx->req.minor_version);
|
ctx->req.path.s, ctx->req.minor_version);
|
||||||
break;
|
break;
|
||||||
case lnm_http_parse_err_incomplete:
|
case lnm_http_parse_err_incomplete:
|
||||||
// If the request is already the size of the read buffer, we close the
|
// If the request is already the size of the read buffer, we close the
|
||||||
|
|
|
@ -1,12 +1,14 @@
|
||||||
#include <fcntl.h>
|
#include <fcntl.h>
|
||||||
#include <netinet/in.h>
|
#include <netinet/in.h>
|
||||||
#include <poll.h>
|
#include <pthread.h>
|
||||||
#include <stdio.h>
|
#include <stdio.h>
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
|
#include <sys/epoll.h>
|
||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
|
|
||||||
#include "lnm/common.h"
|
#include "lnm/common.h"
|
||||||
#include "lnm/log.h"
|
#include "lnm/log.h"
|
||||||
|
#include "lnm/loop.h"
|
||||||
#include "lnm/loop_internal.h"
|
#include "lnm/loop_internal.h"
|
||||||
|
|
||||||
static const char *section = "loop";
|
static const char *section = "loop";
|
||||||
|
@ -47,35 +49,18 @@ lnm_err lnm_loop_accept(lnm_loop *l) {
|
||||||
flags |= O_NONBLOCK;
|
flags |= O_NONBLOCK;
|
||||||
fcntl(conn_fd, F_SETFL, flags);
|
fcntl(conn_fd, F_SETFL, flags);
|
||||||
|
|
||||||
// Append connection to list of connections
|
|
||||||
if ((size_t)conn_fd >= l->conns.len) {
|
|
||||||
// We always calloc as a realloc might introduce unitialized values in the
|
|
||||||
// array
|
|
||||||
lnm_loop_conn **new = calloc(sizeof(lnm_loop_conn *), conn_fd + 1);
|
|
||||||
|
|
||||||
if (new == NULL) {
|
|
||||||
close(conn_fd);
|
|
||||||
|
|
||||||
return lnm_err_failed_alloc;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (l->conns.len > 0) {
|
|
||||||
memcpy(new, l->conns.arr, l->conns.len * sizeof(lnm_loop_conn *));
|
|
||||||
free(l->conns.arr);
|
|
||||||
}
|
|
||||||
|
|
||||||
l->conns.arr = new;
|
|
||||||
l->conns.len = conn_fd + 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
lnm_loop_conn *conn;
|
lnm_loop_conn *conn;
|
||||||
LNM_RES2(lnm_loop_conn_init(&conn, l), close(conn_fd));
|
LNM_RES2(lnm_loop_conn_init(&conn, l), close(conn_fd));
|
||||||
|
|
||||||
l->conns.arr[conn_fd] = conn;
|
|
||||||
conn->fd = conn_fd;
|
conn->fd = conn_fd;
|
||||||
conn->state = lnm_loop_state_req;
|
conn->state = lnm_loop_state_req;
|
||||||
|
|
||||||
l->conns.open++;
|
struct epoll_event event = {.data.ptr = conn,
|
||||||
|
.events = EPOLLIN | EPOLLET | EPOLLONESHOT};
|
||||||
|
|
||||||
|
epoll_ctl(l->epoll_fd, EPOLL_CTL_ADD, conn_fd, &event);
|
||||||
|
|
||||||
|
l->open++;
|
||||||
|
|
||||||
lnm_ldebug(section, "connection opened with fd %i", conn_fd);
|
lnm_ldebug(section, "connection opened with fd %i", conn_fd);
|
||||||
|
|
||||||
|
@ -115,96 +100,133 @@ lnm_err lnm_loop_setup(lnm_loop *l, uint16_t port) {
|
||||||
int flags = fcntl(listen_fd, F_GETFL);
|
int flags = fcntl(listen_fd, F_GETFL);
|
||||||
fcntl(listen_fd, F_SETFL, flags | O_NONBLOCK);
|
fcntl(listen_fd, F_SETFL, flags | O_NONBLOCK);
|
||||||
|
|
||||||
|
int epoll_fd = epoll_create1(0);
|
||||||
|
|
||||||
|
if (epoll_fd < 0) {
|
||||||
|
return lnm_err_failed_network;
|
||||||
|
}
|
||||||
|
|
||||||
|
struct epoll_event event = {
|
||||||
|
// The listening socket is marked using a NULL data field
|
||||||
|
.data.ptr = NULL,
|
||||||
|
.events = EPOLLIN | EPOLLET | EPOLLONESHOT};
|
||||||
|
|
||||||
|
res = epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &event);
|
||||||
|
|
||||||
|
if (res < 0) {
|
||||||
|
return lnm_err_failed_network;
|
||||||
|
}
|
||||||
|
|
||||||
l->listen_fd = listen_fd;
|
l->listen_fd = listen_fd;
|
||||||
|
l->epoll_fd = epoll_fd;
|
||||||
|
|
||||||
return lnm_err_ok;
|
return lnm_err_ok;
|
||||||
}
|
}
|
||||||
|
|
||||||
lnm_err lnm_loop_run(lnm_loop *l) {
|
typedef struct lnm_loop_thread_args {
|
||||||
if (l->listen_fd == 0) {
|
lnm_loop *l;
|
||||||
return lnm_err_not_setup;
|
int id;
|
||||||
}
|
int thread_count;
|
||||||
|
} lnm_loop_thread_args;
|
||||||
|
|
||||||
struct pollfd *poll_args =
|
lnm_err lnm_loop_run_thread(lnm_loop_thread_args *args) {
|
||||||
calloc(LNM_LOOP_INITIAL_CONNS + 1, sizeof(struct pollfd));
|
lnm_loop *l = args->l;
|
||||||
size_t poll_args_cap = LNM_LOOP_INITIAL_CONNS + 1;
|
int thread_id = args->id;
|
||||||
|
int thread_count = args->thread_count;
|
||||||
|
|
||||||
if (poll_args == NULL) {
|
struct epoll_event *events = calloc(1, sizeof(struct epoll_event));
|
||||||
|
int events_cap = 1;
|
||||||
|
|
||||||
|
if (events == NULL) {
|
||||||
return lnm_err_failed_alloc;
|
return lnm_err_failed_alloc;
|
||||||
}
|
}
|
||||||
|
|
||||||
// First argument is listening socket
|
lnm_linfo(section, "thread %i started", thread_id);
|
||||||
poll_args[0].fd = l->listen_fd;
|
|
||||||
poll_args[0].events = POLLIN;
|
|
||||||
|
|
||||||
lnm_linfo(section, "started on fd %i", l->listen_fd);
|
struct epoll_event listen_event = {
|
||||||
|
.data.ptr = NULL, .events = EPOLLIN | EPOLLET | EPOLLONESHOT};
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
nfds_t poll_args_len = 1;
|
int polled = epoll_wait(l->epoll_fd, events, events_cap, -1);
|
||||||
|
lnm_ldebug(section, "polled (thread %i): %i", thread_id, polled);
|
||||||
// Add all open connections to the poll command
|
|
||||||
for (size_t i = 0; i < l->conns.len && poll_args_len < l->conns.open + 1;
|
|
||||||
i++) {
|
|
||||||
const lnm_loop_conn *conn = l->conns.arr[i];
|
|
||||||
|
|
||||||
if (conn == NULL) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
|
|
||||||
poll_args[poll_args_len].fd = conn->fd;
|
|
||||||
poll_args[poll_args_len].events =
|
|
||||||
((conn->state == lnm_loop_state_req) ? POLLIN : POLLOUT) | POLLERR;
|
|
||||||
|
|
||||||
poll_args_len++;
|
|
||||||
}
|
|
||||||
|
|
||||||
int polled = poll(poll_args, poll_args_len, -1);
|
|
||||||
|
|
||||||
if (polled < 0) {
|
if (polled < 0) {
|
||||||
return lnm_err_failed_poll;
|
return lnm_err_failed_poll;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (poll_args[0].revents) {
|
for (int i = 0; i < polled; i++) {
|
||||||
lnm_loop_accept(l);
|
if (events[i].data.ptr == NULL) {
|
||||||
polled--;
|
lnm_loop_accept(l);
|
||||||
}
|
|
||||||
|
|
||||||
for (size_t i = 1; i < poll_args_len && polled > 0; i++) {
|
|
||||||
if (poll_args[i].revents) {
|
|
||||||
lnm_loop_conn *conn = l->conns.arr[poll_args[i].fd];
|
|
||||||
|
|
||||||
|
epoll_ctl(l->epoll_fd, EPOLL_CTL_MOD, l->listen_fd, &listen_event);
|
||||||
|
} else {
|
||||||
|
lnm_loop_conn *conn = events[i].data.ptr;
|
||||||
lnm_loop_conn_io(l, conn);
|
lnm_loop_conn_io(l, conn);
|
||||||
|
|
||||||
if (conn->state == lnm_loop_state_end) {
|
if (conn->state == lnm_loop_state_end) {
|
||||||
l->conns.arr[conn->fd] = NULL;
|
int conn_fd = conn->fd;
|
||||||
close(conn->fd);
|
|
||||||
|
|
||||||
l->conns.open--;
|
|
||||||
lnm_ldebug(section, "connection closed with fd %i", conn->fd);
|
|
||||||
|
|
||||||
lnm_loop_conn_free(l, conn);
|
lnm_loop_conn_free(l, conn);
|
||||||
}
|
close(conn_fd);
|
||||||
|
l->open--;
|
||||||
|
|
||||||
polled--;
|
epoll_ctl(l->epoll_fd, EPOLL_CTL_DEL, conn_fd, NULL);
|
||||||
|
|
||||||
|
lnm_ldebug(section, "connection closed with fd %i", conn_fd);
|
||||||
|
} else {
|
||||||
|
struct epoll_event event = {
|
||||||
|
.data.ptr = conn,
|
||||||
|
.events =
|
||||||
|
(conn->state == lnm_loop_state_req ? EPOLLIN : EPOLLOUT) |
|
||||||
|
EPOLLET | EPOLLONESHOT};
|
||||||
|
epoll_ctl(l->epoll_fd, EPOLL_CTL_MOD, conn->fd, &event);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (poll_args_cap < l->conns.open + 1) {
|
int open = l->open;
|
||||||
struct pollfd *buf = calloc(l->conns.open + 1, sizeof(struct pollfd));
|
int cap_per_thread =
|
||||||
|
open + 1 > thread_count ? (open + 1) / thread_count : 1;
|
||||||
|
|
||||||
if (buf == NULL) {
|
if (cap_per_thread > events_cap) {
|
||||||
|
struct epoll_event *new_events =
|
||||||
|
malloc(cap_per_thread * sizeof(struct epoll_event));
|
||||||
|
|
||||||
|
if (new_events == NULL) {
|
||||||
return lnm_err_failed_alloc;
|
return lnm_err_failed_alloc;
|
||||||
}
|
}
|
||||||
|
|
||||||
buf[0].fd = l->listen_fd;
|
free(events);
|
||||||
buf[0].events = POLLIN;
|
events = new_events;
|
||||||
|
events_cap = cap_per_thread;
|
||||||
free(poll_args);
|
|
||||||
poll_args = buf;
|
|
||||||
|
|
||||||
poll_args_cap = l->conns.open + 1;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return lnm_err_ok;
|
return lnm_err_ok;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
lnm_err lnm_loop_run(lnm_loop *l, int thread_count) {
|
||||||
|
if (l->epoll_fd == 0) {
|
||||||
|
return lnm_err_not_setup;
|
||||||
|
}
|
||||||
|
|
||||||
|
lnm_loop_thread_args args[thread_count];
|
||||||
|
|
||||||
|
for (int i = 1; i < thread_count; i++) {
|
||||||
|
args[i].l = l;
|
||||||
|
args[i].id = i;
|
||||||
|
args[i].thread_count = thread_count;
|
||||||
|
|
||||||
|
pthread_t thread;
|
||||||
|
pthread_create(&thread, NULL, (void *(*)(void *))lnm_loop_run_thread,
|
||||||
|
&args[i]);
|
||||||
|
}
|
||||||
|
|
||||||
|
args[0].l = l;
|
||||||
|
args[0].id = 0;
|
||||||
|
args[0].thread_count = thread_count;
|
||||||
|
|
||||||
|
lnm_loop_run_thread(&args[0]);
|
||||||
|
|
||||||
|
return lnm_err_ok;
|
||||||
|
}
|
||||||
|
|
|
@ -3,8 +3,8 @@
|
||||||
|
|
||||||
#include "lnm/http/consts.h"
|
#include "lnm/http/consts.h"
|
||||||
#include "lnm/http/loop.h"
|
#include "lnm/http/loop.h"
|
||||||
#include "lnm/loop.h"
|
|
||||||
#include "lnm/log.h"
|
#include "lnm/log.h"
|
||||||
|
#include "lnm/loop.h"
|
||||||
#include "lsm/store.h"
|
#include "lsm/store.h"
|
||||||
|
|
||||||
#include "lander.h"
|
#include "lander.h"
|
||||||
|
@ -40,7 +40,8 @@ lnm_http_step_err lander_get_redirect(lnm_http_conn *conn) {
|
||||||
// This shouldn't be able to happen
|
// This shouldn't be able to happen
|
||||||
if (lsm_entry_attr_get(&url_attr_val, c_ctx->entry, lander_attr_type_url) !=
|
if (lsm_entry_attr_get(&url_attr_val, c_ctx->entry, lander_attr_type_url) !=
|
||||||
lsm_error_ok) {
|
lsm_error_ok) {
|
||||||
lnm_lerror("lander", "%s", "Entry of type redirect detected without URL attribute");
|
lnm_lerror("lander", "%s",
|
||||||
|
"Entry of type redirect detected without URL attribute");
|
||||||
|
|
||||||
ctx->res.status = lnm_http_status_internal_server_error;
|
ctx->res.status = lnm_http_status_internal_server_error;
|
||||||
lsm_entry_close(c_ctx->entry);
|
lsm_entry_close(c_ctx->entry);
|
||||||
|
|
|
@ -104,7 +104,7 @@ int main() {
|
||||||
lnm_linfo("main", "Store loaded containing %lu entries",
|
lnm_linfo("main", "Store loaded containing %lu entries",
|
||||||
lsm_store_size(c_gctx->store));
|
lsm_store_size(c_gctx->store));
|
||||||
lnm_http_loop *hl = loop_init(c_gctx, api_key);
|
lnm_http_loop *hl = loop_init(c_gctx, api_key);
|
||||||
lnm_http_loop_run(hl, port);
|
lnm_http_loop_run(hl, port, 1);
|
||||||
|
|
||||||
/* http_loop *hl = http_loop_init( */
|
/* http_loop *hl = http_loop_init( */
|
||||||
/* lander_routes, sizeof(lander_routes) / sizeof(lander_routes[0]),
|
/* lander_routes, sizeof(lander_routes) / sizeof(lander_routes[0]),
|
||||||
|
|
Loading…
Reference in New Issue