diff --git a/lnm/include/lnm/common.h b/lnm/include/lnm/common.h index 26a3932..1500d73 100644 --- a/lnm/include/lnm/common.h +++ b/lnm/include/lnm/common.h @@ -22,6 +22,7 @@ typedef enum { lnm_err_failed_alloc, lnm_err_failed_network, lnm_err_failed_poll, + lnm_err_not_setup, } lnm_err; #endif diff --git a/lnm/include/lnm/loop.h b/lnm/include/lnm/loop.h index c0bce0b..d70057e 100644 --- a/lnm/include/lnm/loop.h +++ b/lnm/include/lnm/loop.h @@ -27,11 +27,11 @@ typedef struct { struct { char buf[LNM_LOOP_BUF_SIZE]; size_t size; - size_t written; } w; } lnm_loop_conn; typedef struct { + int listen_fd; struct { lnm_loop_conn **arr; size_t len; @@ -40,12 +40,16 @@ typedef struct { void *gctx; lnm_err (*ctx_init)(void **out, void *gctx); void (*ctx_free)(void *ctx); + void (*data_read)(lnm_loop_conn *conn); + void (*data_write)(lnm_loop_conn *conn); } lnm_loop; lnm_err lnm_loop_init(lnm_loop **out, void *gctx, lnm_err (*ctx_init)(void **out, void *gctx), void (*ctx_free)(void *ctx)); -lnm_err lnm_loop_run(lnm_loop *l, uint16_t port); +lnm_err lnm_loop_setup(lnm_loop *l, uint16_t port); + +lnm_err lnm_loop_run(lnm_loop *l); #endif diff --git a/lnm/src/_include/lnm/loop_internal.h b/lnm/src/_include/lnm/loop_internal.h index 0c91535..6b723fe 100644 --- a/lnm/src/_include/lnm/loop_internal.h +++ b/lnm/src/_include/lnm/loop_internal.h @@ -5,3 +5,5 @@ lnm_err lnm_loop_conn_init(lnm_loop_conn **out, lnm_loop *l); void lnm_loop_conn_free(lnm_loop *l, lnm_loop_conn *conn); lnm_err lnm_loop_accept(lnm_loop *l); + +void lnm_loop_conn_io(lnm_loop *l, lnm_loop_conn *conn); diff --git a/lnm/src/loop/lnm_loop.c b/lnm/src/loop/lnm_loop.c index d835824..63e3b34 100644 --- a/lnm/src/loop/lnm_loop.c +++ b/lnm/src/loop/lnm_loop.c @@ -1,3 +1,4 @@ +#include #include #include #include @@ -23,7 +24,46 @@ lnm_err lnm_loop_init(lnm_loop **out, void *gctx, return lnm_err_ok; } -lnm_err lnm_loop_run(lnm_loop *l, uint16_t port) { +lnm_err lnm_loop_accept(lnm_loop *l) { + int conn_fd = accept(l->listen_fd, NULL, NULL); + + if (conn_fd < 0) { + return lnm_err_failed_network; + } + + // Set socket to non-blocking + int flags = fcntl(conn_fd, F_GETFL); + flags |= O_NONBLOCK; + fcntl(conn_fd, F_SETFL, flags); + + // Append connection to list of connections + if ((size_t)conn_fd >= l->conns.len) { + lnm_loop_conn **new = + realloc(l->conns.arr, sizeof(lnm_loop_conn *) * (conn_fd + 1)); + + if (new == NULL) { + close(conn_fd); + + return lnm_err_failed_alloc; + } + + l->conns.arr = new; + l->conns.len = conn_fd + 1; + } + + lnm_loop_conn *conn; + LNM_RES2(lnm_loop_conn_init(&conn, l), close(conn_fd)); + + l->conns.arr[conn_fd] = conn; + conn->fd = conn_fd; + conn->state = lnm_loop_state_req; + + l->conns.open++; + + return lnm_err_ok; +} + +lnm_err lnm_loop_setup(lnm_loop *l, uint16_t port) { int listen_fd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0); if (listen_fd < 0) { @@ -53,6 +93,16 @@ lnm_err lnm_loop_run(lnm_loop *l, uint16_t port) { return lnm_err_failed_network; } + l->listen_fd = listen_fd; + + return lnm_err_ok; +} + +lnm_err lnm_loop_run(lnm_loop *l) { + if (l->listen_fd == 0) { + return lnm_err_not_setup; + } + struct pollfd *poll_args = malloc((LNM_LOOP_INITIAL_CONNS + 1) * sizeof(struct pollfd)); size_t poll_args_cap = LNM_LOOP_INITIAL_CONNS + 1; @@ -62,14 +112,14 @@ lnm_err lnm_loop_run(lnm_loop *l, uint16_t port) { } // First argument is listening socket - poll_args[0].fd = listen_fd; + poll_args[0].fd = l->listen_fd; poll_args[0].events = POLLIN; while (1) { size_t poll_args_len = 1; // Add all open connections to the poll command - for (size_t i = 0; i < l->conns.len && poll_args_len <= l->conns.open; + for (size_t i = 0; i < l->conns.len && poll_args_len < l->conns.open + 1; i++) { lnm_loop_conn *conn = l->conns.arr[i]; @@ -99,7 +149,7 @@ lnm_err lnm_loop_run(lnm_loop *l, uint16_t port) { if (poll_args[i].revents) { lnm_loop_conn *conn = l->conns.arr[poll_args[i].fd]; - // TODO actual IO + lnm_loop_conn_io(l, conn); if (conn->state == lnm_loop_state_end) { l->conns.arr[conn->fd] = NULL; @@ -120,7 +170,7 @@ lnm_err lnm_loop_run(lnm_loop *l, uint16_t port) { return lnm_err_failed_alloc; } - buf[0].fd = listen_fd; + buf[0].fd = l->listen_fd; buf[0].events = POLLIN; free(poll_args); diff --git a/lnm/src/loop/lnm_loop_io.c b/lnm/src/loop/lnm_loop_io.c new file mode 100644 index 0000000..5d89191 --- /dev/null +++ b/lnm/src/loop/lnm_loop_io.c @@ -0,0 +1,77 @@ +#include +#include +#include + +#include "lnm/loop.h" +#include "lnm/loop_internal.h" + +void lnm_loop_conn_io_req(lnm_loop *l, lnm_loop_conn *conn) { + do { + // Move remaining data to front of buffer + memmove(conn->r.buf, &conn->r.buf[conn->r.read], + conn->r.size - conn->r.read); + conn->r.size -= conn->r.read; + conn->r.read = 0; + + ssize_t res; + size_t cap = LNM_LOOP_BUF_SIZE - conn->r.size; + + do { + res = read(conn->fd, &conn->r.buf[conn->r.size], cap); + } while (res < 0 && errno == EINTR); + + // Read can't be performed without blocking; we come back later + if (res < 0 && errno == EAGAIN) { + return; + } + + if (res <= 0) { + conn->state = lnm_loop_state_end; + + return; + } + + conn->r.size += res; + l->data_read(conn); + } while (conn->state == lnm_loop_state_req); +} + +void lnm_loop_conn_io_res(lnm_loop *l, lnm_loop_conn *conn) { + do { + ssize_t res; + + do { + res = write(conn->fd, conn->w.buf, conn->w.size); + } while (res < 0 && errno == EINTR); + + // Write can't be performed without blocking; we come back later + if (res < 0 && errno == EAGAIN) { + return; + } + + if (res <= 0) { + conn->state = lnm_loop_state_end; + + return; + } + + // Move remaining data to front of buffer. Doing this here gives the data + // writer function more space to work with + memmove(conn->w.buf, &conn->w.buf[res], conn->w.size - res); + conn->w.size -= res; + + l->data_write(conn); + } while (conn->state == lnm_loop_state_res); +} + +void lnm_loop_conn_io(lnm_loop *l, lnm_loop_conn *conn) { + switch (conn->state) { + case lnm_loop_state_req: + lnm_loop_conn_io_req(l, conn); + break; + case lnm_loop_state_res: + lnm_loop_conn_io_res(l, conn); + break; + default:; + } +}