feat(lnm): wrote rest of event loop
ci/woodpecker/push/build Pipeline was successful Details

lnm
Jef Roosens 2023-11-22 12:59:28 +01:00
parent 8a3be2b07c
commit f79ba2818c
Signed by: Jef Roosens
GPG Key ID: B75D4F293C7052DB
5 changed files with 141 additions and 7 deletions

View File

@ -22,6 +22,7 @@ typedef enum {
lnm_err_failed_alloc, lnm_err_failed_alloc,
lnm_err_failed_network, lnm_err_failed_network,
lnm_err_failed_poll, lnm_err_failed_poll,
lnm_err_not_setup,
} lnm_err; } lnm_err;
#endif #endif

View File

@ -27,11 +27,11 @@ typedef struct {
struct { struct {
char buf[LNM_LOOP_BUF_SIZE]; char buf[LNM_LOOP_BUF_SIZE];
size_t size; size_t size;
size_t written;
} w; } w;
} lnm_loop_conn; } lnm_loop_conn;
typedef struct { typedef struct {
int listen_fd;
struct { struct {
lnm_loop_conn **arr; lnm_loop_conn **arr;
size_t len; size_t len;
@ -40,12 +40,16 @@ typedef struct {
void *gctx; void *gctx;
lnm_err (*ctx_init)(void **out, void *gctx); lnm_err (*ctx_init)(void **out, void *gctx);
void (*ctx_free)(void *ctx); void (*ctx_free)(void *ctx);
void (*data_read)(lnm_loop_conn *conn);
void (*data_write)(lnm_loop_conn *conn);
} lnm_loop; } lnm_loop;
lnm_err lnm_loop_init(lnm_loop **out, void *gctx, lnm_err lnm_loop_init(lnm_loop **out, void *gctx,
lnm_err (*ctx_init)(void **out, void *gctx), lnm_err (*ctx_init)(void **out, void *gctx),
void (*ctx_free)(void *ctx)); void (*ctx_free)(void *ctx));
lnm_err lnm_loop_run(lnm_loop *l, uint16_t port); lnm_err lnm_loop_setup(lnm_loop *l, uint16_t port);
lnm_err lnm_loop_run(lnm_loop *l);
#endif #endif

View File

@ -5,3 +5,5 @@ lnm_err lnm_loop_conn_init(lnm_loop_conn **out, lnm_loop *l);
void lnm_loop_conn_free(lnm_loop *l, lnm_loop_conn *conn); void lnm_loop_conn_free(lnm_loop *l, lnm_loop_conn *conn);
lnm_err lnm_loop_accept(lnm_loop *l); lnm_err lnm_loop_accept(lnm_loop *l);
void lnm_loop_conn_io(lnm_loop *l, lnm_loop_conn *conn);

View File

@ -1,3 +1,4 @@
#include <fcntl.h>
#include <netinet/in.h> #include <netinet/in.h>
#include <poll.h> #include <poll.h>
#include <unistd.h> #include <unistd.h>
@ -23,7 +24,46 @@ lnm_err lnm_loop_init(lnm_loop **out, void *gctx,
return lnm_err_ok; return lnm_err_ok;
} }
lnm_err lnm_loop_run(lnm_loop *l, uint16_t port) { lnm_err lnm_loop_accept(lnm_loop *l) {
int conn_fd = accept(l->listen_fd, NULL, NULL);
if (conn_fd < 0) {
return lnm_err_failed_network;
}
// Set socket to non-blocking
int flags = fcntl(conn_fd, F_GETFL);
flags |= O_NONBLOCK;
fcntl(conn_fd, F_SETFL, flags);
// Append connection to list of connections
if ((size_t)conn_fd >= l->conns.len) {
lnm_loop_conn **new =
realloc(l->conns.arr, sizeof(lnm_loop_conn *) * (conn_fd + 1));
if (new == NULL) {
close(conn_fd);
return lnm_err_failed_alloc;
}
l->conns.arr = new;
l->conns.len = conn_fd + 1;
}
lnm_loop_conn *conn;
LNM_RES2(lnm_loop_conn_init(&conn, l), close(conn_fd));
l->conns.arr[conn_fd] = conn;
conn->fd = conn_fd;
conn->state = lnm_loop_state_req;
l->conns.open++;
return lnm_err_ok;
}
lnm_err lnm_loop_setup(lnm_loop *l, uint16_t port) {
int listen_fd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0); int listen_fd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
if (listen_fd < 0) { if (listen_fd < 0) {
@ -53,6 +93,16 @@ lnm_err lnm_loop_run(lnm_loop *l, uint16_t port) {
return lnm_err_failed_network; return lnm_err_failed_network;
} }
l->listen_fd = listen_fd;
return lnm_err_ok;
}
lnm_err lnm_loop_run(lnm_loop *l) {
if (l->listen_fd == 0) {
return lnm_err_not_setup;
}
struct pollfd *poll_args = struct pollfd *poll_args =
malloc((LNM_LOOP_INITIAL_CONNS + 1) * sizeof(struct pollfd)); malloc((LNM_LOOP_INITIAL_CONNS + 1) * sizeof(struct pollfd));
size_t poll_args_cap = LNM_LOOP_INITIAL_CONNS + 1; size_t poll_args_cap = LNM_LOOP_INITIAL_CONNS + 1;
@ -62,14 +112,14 @@ lnm_err lnm_loop_run(lnm_loop *l, uint16_t port) {
} }
// First argument is listening socket // First argument is listening socket
poll_args[0].fd = listen_fd; poll_args[0].fd = l->listen_fd;
poll_args[0].events = POLLIN; poll_args[0].events = POLLIN;
while (1) { while (1) {
size_t poll_args_len = 1; size_t poll_args_len = 1;
// Add all open connections to the poll command // Add all open connections to the poll command
for (size_t i = 0; i < l->conns.len && poll_args_len <= l->conns.open; for (size_t i = 0; i < l->conns.len && poll_args_len < l->conns.open + 1;
i++) { i++) {
lnm_loop_conn *conn = l->conns.arr[i]; lnm_loop_conn *conn = l->conns.arr[i];
@ -99,7 +149,7 @@ lnm_err lnm_loop_run(lnm_loop *l, uint16_t port) {
if (poll_args[i].revents) { if (poll_args[i].revents) {
lnm_loop_conn *conn = l->conns.arr[poll_args[i].fd]; lnm_loop_conn *conn = l->conns.arr[poll_args[i].fd];
// TODO actual IO lnm_loop_conn_io(l, conn);
if (conn->state == lnm_loop_state_end) { if (conn->state == lnm_loop_state_end) {
l->conns.arr[conn->fd] = NULL; l->conns.arr[conn->fd] = NULL;
@ -120,7 +170,7 @@ lnm_err lnm_loop_run(lnm_loop *l, uint16_t port) {
return lnm_err_failed_alloc; return lnm_err_failed_alloc;
} }
buf[0].fd = listen_fd; buf[0].fd = l->listen_fd;
buf[0].events = POLLIN; buf[0].events = POLLIN;
free(poll_args); free(poll_args);

View File

@ -0,0 +1,77 @@
#include <errno.h>
#include <string.h>
#include <unistd.h>
#include "lnm/loop.h"
#include "lnm/loop_internal.h"
void lnm_loop_conn_io_req(lnm_loop *l, lnm_loop_conn *conn) {
do {
// Move remaining data to front of buffer
memmove(conn->r.buf, &conn->r.buf[conn->r.read],
conn->r.size - conn->r.read);
conn->r.size -= conn->r.read;
conn->r.read = 0;
ssize_t res;
size_t cap = LNM_LOOP_BUF_SIZE - conn->r.size;
do {
res = read(conn->fd, &conn->r.buf[conn->r.size], cap);
} while (res < 0 && errno == EINTR);
// Read can't be performed without blocking; we come back later
if (res < 0 && errno == EAGAIN) {
return;
}
if (res <= 0) {
conn->state = lnm_loop_state_end;
return;
}
conn->r.size += res;
l->data_read(conn);
} while (conn->state == lnm_loop_state_req);
}
void lnm_loop_conn_io_res(lnm_loop *l, lnm_loop_conn *conn) {
do {
ssize_t res;
do {
res = write(conn->fd, conn->w.buf, conn->w.size);
} while (res < 0 && errno == EINTR);
// Write can't be performed without blocking; we come back later
if (res < 0 && errno == EAGAIN) {
return;
}
if (res <= 0) {
conn->state = lnm_loop_state_end;
return;
}
// Move remaining data to front of buffer. Doing this here gives the data
// writer function more space to work with
memmove(conn->w.buf, &conn->w.buf[res], conn->w.size - res);
conn->w.size -= res;
l->data_write(conn);
} while (conn->state == lnm_loop_state_res);
}
void lnm_loop_conn_io(lnm_loop *l, lnm_loop_conn *conn) {
switch (conn->state) {
case lnm_loop_state_req:
lnm_loop_conn_io_req(l, conn);
break;
case lnm_loop_state_res:
lnm_loop_conn_io_res(l, conn);
break;
default:;
}
}