diff --git a/Makefile b/Makefile index 3711c13..43ad4a7 100644 --- a/Makefile +++ b/Makefile @@ -6,18 +6,21 @@ BIN_FILENAME ?= lander BUILD_DIR ?= build SRC_DIR ?= src TEST_DIR ?= test -INC_DIRS ?= include +THIRDPARTY_DIR ?= thirdparty +INC_DIRS ?= include $(THIRDPARTY_DIR)/include BIN := $(BUILD_DIR)/$(BIN_FILENAME) SRCS != find '$(SRC_DIR)' -iname '*.c' -SRCS_H != find $(INC_DIRS) -iname '*.h' -SRCS_H_INTERNAL != find $(SRC_DIR) -iname '*.h' SRCS_TEST != find '$(TEST_DIR)' -iname '*.c' +SRCS_THIRDPARTY != find '$(THIRDPARTY_DIR)/src' -iname '*.c' -OBJS := $(SRCS:%=$(BUILD_DIR)/%.o) +SRCS_H != find include -iname '*.h' +SRCS_H_INTERNAL != find $(SRC_DIR) -iname '*.h' + +OBJS := $(SRCS:%=$(BUILD_DIR)/%.o) $(SRCS_THIRDPARTY:%=$(BUILD_DIR)/%.o) OBJS_TEST := $(SRCS_TEST:%=$(BUILD_DIR)/%.o) -DEPS := $(SRCS:%=$(BUILD_DIR)/%.d) $(SRCS_TEST:%=$(BUILD_DIR)/%.d) +DEPS := $(SRCS:%=$(BUILD_DIR)/%.d) $(SRCS_THIRDPARTY:%=$(BUILD_DIR)/%.d) $(SRCS_TEST:%=$(BUILD_DIR)/%.d) BINS_TEST := $(OBJS_TEST:%.c.o=%) TARGETS_TEST := $(BINS_TEST:%=test-%) @@ -51,6 +54,10 @@ $(BUILD_DIR)/$(SRC_DIR)/%.c.o: $(SRC_DIR)/%.c mkdir -p $(dir $@) $(CC) $(INTERNALCFLAGS) -c $< -o $@ +$(BUILD_DIR)/$(THIRDPARTY_DIR)/%.c.o: $(THIRDPARTY_DIR)/%.c + mkdir -p $(dir $@) + $(CC) $(INTERNALCFLAGS) -c $< -o $@ + # =====TESTING===== .PHONY: test diff --git a/include/event_loop.h b/include/event_loop.h new file mode 100644 index 0000000..4912e44 --- /dev/null +++ b/include/event_loop.h @@ -0,0 +1,33 @@ +#ifndef LANDER_EVENT_LOOP +#define LANDER_EVENT_LOOP + +// Size of the read and write buffers for each connection, in bytes +#define EVENT_LOOP_BUFFER_SIZE 1024 + +/** + * Represents an active connection managed by the event loop + */ +typedef struct event_loop_conn event_loop_conn; + +typedef enum { + event_loop_conn_state_req = 0, + event_loop_conn_state_res = 1, + event_loop_conn_state_end = 2, +} event_loop_conn_state; + +/* + * Main struct object representing the event loop + */ +typedef struct event_loop event_loop; + +/* + * Initialize a new event loop + */ +event_loop *event_loop_init(); + +/* + * Run the event loop. This function never returns. + */ +void event_loop_run(event_loop *el, int port); + +#endif diff --git a/src/event_loop.c b/src/event_loop.c deleted file mode 100644 index 2b0c333..0000000 --- a/src/event_loop.c +++ /dev/null @@ -1,339 +0,0 @@ -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include "picohttpparser.h" - -#define MAX_MSG_SIZE 1024 - -const char http_200_ok[] = - "HTTP/1.1 200 OK\n" - "Connection: close\n"; - -static void fd_set_nb(int fd) { - int flags = fcntl(fd, F_GETFL, 0); - /* if (errno) { */ - /* die("fcntl error"); */ - /* return; */ - /* } */ - - flags |= O_NONBLOCK; - - fcntl(fd, F_SETFL, flags); - /* if (errno) { */ - /* die("fcntl error"); */ - /* } */ -} - -enum { - STATE_REQ = 0, - STATE_RES = 1, - STATE_END = 2, -}; - -typedef struct conn { - int fd; - uint32_t state; - // buffer for reading - size_t rbuf_size; - uint8_t rbuf[MAX_MSG_SIZE]; - // buffer for writing - size_t wbuf_size; - size_t wbuf_sent; - uint8_t wbuf[MAX_MSG_SIZE]; - void (*process_func) (struct conn *); -} conn; - -typedef struct event_loop { - conn **connections; - size_t connection_count; -} event_loop; - -void event_loop_put(event_loop *loop, conn *c) { - // TODO properly catch realloc - if (c->fd >= loop->connection_count) { - loop->connections = realloc(loop->connections, sizeof(conn) * (c->fd + 1)); - loop->connection_count = c->fd + 1; - } - - printf("Add fd %i\n", c->fd); - - loop->connections[c->fd] = c; -} - -int event_loop_accept(event_loop *loop, int fd) { - struct sockaddr_in client_addr; - socklen_t socklen = sizeof(client_addr); - int connfd = accept(fd, (struct sockaddr *)&client_addr, &socklen); - if (connfd < 0) { - printf("accept() error"); - return -1; // error - } - - // set the new connection fd to nonblocking mode - fd_set_nb(connfd); - - // creating the struct Conn - conn *c = calloc(sizeof(conn), 1); - - if (!c) { - close(connfd); - return -1; - } - - c->fd = connfd; - c->state = STATE_REQ; - - event_loop_put(loop, c); - return 0; -} - -bool conn_write_to_fd(conn *c) { - ssize_t res = 0; - size_t remain = c->wbuf_size - c->wbuf_sent; - - do { - res = write(c->fd, &c->wbuf[c->wbuf_sent], remain); - } while (res < 0 && errno == EINTR); - - // EAGAIN doesn't mean there was an error, but rather that there's no more - // data right now, but there might be more later, aka "try again later" - if (res < 0 && errno == EAGAIN) { - return false; - } - - // If it's not EGAIN, there was an error writing so we simply end the request - if (res < 0) { - c->state = STATE_END; - return false; - } - - c->wbuf_sent += (size_t)res; - - // Everything is written from the buffer, so we exit - if (c->wbuf_sent == c->wbuf_size) { - c->state = STATE_END; - /* c->wbuf_sent = 0; */ - /* c->wbuf_size = 0; */ - - return false; - } - - // still got some data in wbuf, could try to write again - return true; -} - -void try_one_request(conn *c) { - if (c->process_func != NULL) { - c->process_func(c); - } - - char *method, *path; - struct phr_header headers[16]; - size_t method_len, path_len, num_headers; - int minor_version; - - num_headers = sizeof(headers) / sizeof(headers[0]); - - int res = phr_parse_request((const char *) c->rbuf, c->rbuf_size, &method, &method_len, &path, &path_len, &minor_version, headers, &num_headers, 0); - - if (res > 0) { - - } else if (res == -1) { - c->state = STATE_END; - } else if (res == -2) { - // We don't do anything here - } -} - -/** - * Read new data into the read buffer. This command performs at most one - * successful read syscall. - * - * Returns whether the function should be retried immediately or not. - */ -bool conn_read_from_fd(conn *c) { - ssize_t res; - size_t cap = MAX_MSG_SIZE - c->rbuf_size; - - // Try to read at most cap bytes from the file descriptor - do { - res = read(c->fd, &c->rbuf[c->rbuf_size], cap); - } while (res < 0 && errno == EINTR); - - // EGAIN means we try again later - if (res < 0 && errno == EAGAIN) { - return false; - } - - // Any other negative error message means the read errored out - if (res < 0) { - c->state = STATE_END; - - return false; - } - - // An output of 0 zero means we've reached the end of the input - if (res == 0) { - } - - // We switch to processing mode if we've reached the end of the data stream, - // or if the read buffer is filled - /* if (res == 0 || c->rbuf_size == MAX_MSG_SIZE) { */ - /* c->state = STATE_PROCESS; */ - /* return false; */ - /* } */ - - c->rbuf_size += (size_t)res; - printf("rbuf size: %lu", c->rbuf_size); - - /* assert(conn->rbuf_size <= sizeof(conn->rbuf)); */ - - // Try to process requests one by one. - // Try to process requests one by one. - // Why is there a loop? Please read the explanation of "pipelining". - try_one_request(c); - - // We can keep reading as long as we're in request mode - return c->state == STATE_REQ; - /* while (try_one_request(conn)) {} */ - /* return (conn->state == STATE_REQ); */ -} - -void conn_state_res(conn *c) { - while (conn_write_to_fd(c)) {} -} - -void conn_state_req(conn *c) { - while (conn_read_from_fd(c)) {} -} - -/* void conn_state_process(conn *c) { */ -/* printf("bruh"); */ -/* memcpy(c->wbuf, c->rbuf, c->rbuf_size); */ -/* c->wbuf_size = c->rbuf_size; */ -/* c->state = STATE_WRITE; */ -/* } */ - - -static void connection_io(conn *c) { - c->rbuf[c->rbuf_size - 1] = '\0'; - printf("%s\n", c->rbuf); - switch (c->state) { - case STATE_REQ: - conn_state_req(c); break; - case STATE_RES: - conn_state_res(c); break; - } - printf("%i\n", c->state); -} - -int main() { - setvbuf(stdout, NULL, _IONBF, 0); - int fd = socket(AF_INET, SOCK_STREAM, 0); - if (fd < 0) { - return -1; - /* die("socket()"); */ - } - - int val = 1; - setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)); - - // bind - struct sockaddr_in addr = {}; - addr.sin_family = AF_INET; - addr.sin_port = ntohs(8000); - addr.sin_addr.s_addr = ntohl(0); // wildcard address 0.0.0.0 - int rv = bind(fd, (const struct sockaddr *)&addr, sizeof(addr)); - if (rv) { - /* die("bind()"); */ - return -1; - } - - // listen - rv = listen(fd, SOMAXCONN); - if (rv) { - /* die("listen()"); */ - return -1; - } - - // set the listen fd to nonblocking mode - fd_set_nb(fd); - - event_loop *loop = calloc(sizeof(event_loop), 1); - loop->connections = calloc(sizeof(conn), 1); - loop->connection_count = 1; - - struct pollfd *poll_args = calloc(sizeof(struct pollfd), 32); - size_t poll_args_count; - - // for convenience, the listening fd is put in the first position - struct pollfd pfd = {fd, POLLIN, 0}; - poll_args[0] = pfd; - - conn *c; - int events; - - while (1) { - poll_args_count = 1; - - // connection fds - for (size_t i = 0; i < loop->connection_count; i++) { - c = loop->connections[i]; - - if (!c) { - continue; - } - - events = (c->state == STATE_REQ) ? POLLIN : POLLOUT; - events |= POLLERR; - - struct pollfd pfd = {c->fd, events, 0}; - - poll_args[poll_args_count] = pfd; - poll_args_count++; - - // We do at most 32 connections at a time for now - if (poll_args_count == 32) - break; - } - - // poll for active fds - // the timeout argument doesn't matter here - int rv = poll(poll_args, (nfds_t)poll_args_count, 1000); - if (rv < 0) { - /* die("poll"); */ - return -1; - } - - // process active connections - for (size_t i = 1; i < poll_args_count; ++i) { - if (poll_args[i].revents) { - conn *c = loop->connections[poll_args[i].fd]; - connection_io(c); - - if (c->state == STATE_END) { - // client closed normally, or something bad happened. - // destroy this connection - loop->connections[c->fd] = NULL; - close(c->fd); - free(c); - } - } - } - - // try to accept a new connection if the listening fd is active - if (poll_args[0].revents) { - (void)event_loop_accept(loop, fd); - } - } - - return 0; -} diff --git a/src/event_loop/event_loop.c b/src/event_loop/event_loop.c new file mode 100644 index 0000000..b653efa --- /dev/null +++ b/src/event_loop/event_loop.c @@ -0,0 +1,205 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "event_loop_internal.h" +#include "picohttpparser.h" + +const char http_200_ok[] = "HTTP/1.1 200 OK\n" + "Connection: close\n"; + +static int event_loop_fd_set_nb(int fd) { + int flags = fcntl(fd, F_GETFL, 0); + + if (errno) { + return -1; + } + + flags |= O_NONBLOCK; + + fcntl(fd, F_SETFL, flags); + + if (errno) { + return -1; + } + + return 0; +} + +event_loop *event_loop_init() { + event_loop *el = calloc(sizeof(event_loop), 1); + + // No idea if this is a good starter value + el->connections = calloc(sizeof(event_loop_conn), 16); + el->connection_count = 16; + + return el; +} + +int event_loop_put(event_loop *el, event_loop_conn *conn) { + if ((size_t)conn->fd >= el->connection_count) { + event_loop_conn **resized = + realloc(el->connections, sizeof(event_loop_conn) * (conn->fd + 1)); + + if (resized == NULL) { + return -1; + } + + el->connections = resized; + el->connection_count = conn->fd + 1; + } + + el->connections[conn->fd] = conn; + + return 0; +} + +int event_loop_accept(event_loop *loop, int fd) { + struct sockaddr_in client_addr; + socklen_t socklen = sizeof(client_addr); + int connfd = accept(fd, (struct sockaddr *)&client_addr, &socklen); + + if (connfd < 0) { + return -1; + } + + // set the new connection fd to nonblocking mode + int res = event_loop_fd_set_nb(connfd); + + if (res < 0) { + close(connfd); + + return -2; + } + + // creating the struct Conn + event_loop_conn *conn = calloc(sizeof(event_loop_conn), 1); + + // Close the connectoin if we fail to allocate a connection struct + if (conn == NULL) { + close(connfd); + + return -3; + } + + conn->fd = connfd; + conn->state = event_loop_conn_state_req; + + res = event_loop_put(loop, conn); + + if (res != 0) { + close(connfd); + + return -4; + } + + return 0; +} + +void event_loop_run(event_loop *el, int port) { + int fd = socket(AF_INET, SOCK_STREAM, 0); + + if (fd < 0) { + return; + } + + int val = 1; + setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(val)); + + // bind + struct sockaddr_in addr = {.sin_family = AF_INET, + .sin_port = ntohs(port), + .sin_addr.s_addr = ntohl(0)}; + + int res = bind(fd, (const struct sockaddr *)&addr, sizeof(addr)); + + if (res) { + return; + } + + res = listen(fd, SOMAXCONN); + + if (res) { + return; + } + + // The listening socket is always poll'ed in non-blocking mode as well + res = event_loop_fd_set_nb(fd); + + if (res != 0) { + return; + } + + // TODO don't hardcode the number 32 + struct pollfd *poll_args = calloc(sizeof(struct pollfd), 32); + size_t poll_args_count; + + // for convenience, the listening fd is put in the first position + struct pollfd pfd = {fd, POLLIN, 0}; + poll_args[0] = pfd; + + event_loop_conn *conn; + int events; + + while (1) { + poll_args_count = 1; + + // connection fds + for (size_t i = 0; i < el->connection_count; i++) { + conn = el->connections[i]; + + if (conn == NULL) { + continue; + } + + events = (conn->state == event_loop_conn_state_req) ? POLLIN : POLLOUT; + events |= POLLERR; + + struct pollfd pfd = {conn->fd, events, 0}; + + poll_args[poll_args_count] = pfd; + poll_args_count++; + + // We do at most 32 connections at a time for now + if (poll_args_count == 32) + break; + } + + // poll for active fds + // the timeout argument doesn't matter here + int rv = poll(poll_args, (nfds_t)poll_args_count, 1000); + + if (rv < 0) { + return; + } + + // process active connections + for (size_t i = 1; i < poll_args_count; ++i) { + if (poll_args[i].revents) { + conn = el->connections[poll_args[i].fd]; + event_loop_conn_io(conn); + + if (conn->state == event_loop_conn_state_end) { + // client closed normally, or something bad happened. + // destroy this connection + el->connections[conn->fd] = NULL; + close(conn->fd); + free(conn); + } + } + } + + // try to accept a new connection if the listening fd is active + if (poll_args[0].revents) { + (void)event_loop_accept(el, fd); + } + } +} diff --git a/src/event_loop/event_loop_conn.c b/src/event_loop/event_loop_conn.c new file mode 100644 index 0000000..4a299d8 --- /dev/null +++ b/src/event_loop/event_loop_conn.c @@ -0,0 +1,149 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "picohttpparser.h" + +#include "event_loop_internal.h" + +bool event_loop_conn_write_to_fd(event_loop_conn *conn) { + ssize_t res = 0; + size_t remain = conn->wbuf_size - conn->wbuf_sent; + + do { + res = write(conn->fd, &conn->wbuf[conn->wbuf_sent], remain); + } while (res < 0 && errno == EINTR); + + // EAGAIN doesn't mean there was an error, but rather that there's no more + // data right now, but there might be more later, aka "try again later" + if (res < 0 && errno == EAGAIN) { + return false; + } + + // If it's not EGAIN, there was an error writing so we simply end the request + if (res < 0) { + conn->state = event_loop_conn_state_end; + return false; + } + + conn->wbuf_sent += (size_t)res; + + // Everything is written from the buffer, so we exit + if (conn->wbuf_sent == conn->wbuf_size) { + conn->state = event_loop_conn_state_end; + /* c->wbuf_sent = 0; */ + /* c->wbuf_size = 0; */ + + return false; + } + + // still got some data in wbuf, could try to write again + return true; +} + +void try_one_request(event_loop_conn *conn) { + if (conn->process_func != NULL) { + conn->process_func(conn); + } + + char *method, *path; + struct phr_header headers[16]; + size_t method_len, path_len, num_headers; + int minor_version; + + num_headers = sizeof(headers) / sizeof(headers[0]); + + int res = phr_parse_request((const char *)conn->rbuf, conn->rbuf_size, + &method, &method_len, &path, &path_len, + &minor_version, headers, &num_headers, 0); + + if (res > 0) { + + } else if (res == -1) { + conn->state = event_loop_conn_state_end; + } else if (res == -2) { + // We don't do anything here + } +} + +/** + * Read new data into the read buffer. This command performs at most one + * successful read syscall. + * + * Returns whether the function should be retried immediately or not. + */ +bool event_loop_conn_read_from_fd(event_loop_conn *conn) { + ssize_t res; + size_t cap = EVENT_LOOP_BUFFER_SIZE - conn->rbuf_size; + + // Try to read at most cap bytes from the file descriptor + do { + res = read(conn->fd, &conn->rbuf[conn->rbuf_size], cap); + } while (res < 0 && errno == EINTR); + + // EGAIN means we try again later + if (res < 0 && errno == EAGAIN) { + return false; + } + + // Any other negative error message means the read errored out + if (res < 0) { + conn->state = event_loop_conn_state_end; + + return false; + } + + // An output of 0 zero means we've reached the end of the input + if (res == 0) { + } + + // We switch to processing mode if we've reached the end of the data stream, + // or if the read buffer is filled + /* if (res == 0 || c->rbuf_size == MAX_MSG_SIZE) { */ + /* c->state = STATE_PROCESS; */ + /* return false; */ + /* } */ + + conn->rbuf_size += (size_t)res; + + /* assert(conn->rbuf_size <= sizeof(conn->rbuf)); */ + + // Try to process requests one by one. + // Try to process requests one by one. + // Why is there a loop? Please read the explanation of "pipelining". + try_one_request(conn); + + // We can keep reading as long as we're in request mode + return conn->state == event_loop_conn_state_req; + /* while (try_one_request(conn)) {} */ + /* return (conn->state == STATE_REQ); */ +} + +void conn_state_res(event_loop_conn *conn) { + while (event_loop_conn_write_to_fd(conn)) { + } +} + +void conn_state_req(event_loop_conn *conn) { + while (event_loop_conn_read_from_fd(conn)) { + } +} + +void event_loop_conn_io(event_loop_conn *conn) { + switch (conn->state) { + case event_loop_conn_state_req: + conn_state_req(conn); + break; + case event_loop_conn_state_res: + conn_state_res(conn); + break; + } +} diff --git a/src/event_loop/event_loop_internal.h b/src/event_loop/event_loop_internal.h new file mode 100644 index 0000000..9189ce7 --- /dev/null +++ b/src/event_loop/event_loop_internal.h @@ -0,0 +1,51 @@ +#ifndef LANDER_EVENT_LOOP_INTERNAL +#define LANDER_EVENT_LOOP_INTERNAL + +#include +#include + +#include "event_loop.h" + +typedef struct event_loop_conn { + int fd; + event_loop_conn_state state; + // buffer for reading + size_t rbuf_size; + uint8_t rbuf[EVENT_LOOP_BUFFER_SIZE]; + // buffer for writing + size_t wbuf_size; + size_t wbuf_sent; + uint8_t wbuf[EVENT_LOOP_BUFFER_SIZE]; + void (*process_func)(struct event_loop_conn *); +} event_loop_conn; + +/* + * Initialize a new event_loop_conn struct + */ +event_loop_conn *event_loop_conn_init(); + +typedef struct event_loop { + event_loop_conn **connections; + size_t connection_count; +} event_loop; + +/* + * Initialize a new event_loop struct + */ +event_loop *event_loop_init(); + +/* + * Place a new connection into the event loop's internal array. + * + * Returns -1 if the internal realloc failed + */ +int event_loop_put(event_loop *loop, event_loop_conn *conn); + +/** + * Accept a new connection for the given file descriptor. + */ +int event_loop_accept(event_loop *loop, int fd); + +void event_loop_conn_io(event_loop_conn *conn); + +#endif diff --git a/src/main.c b/src/main.c new file mode 100644 index 0000000..57e0881 --- /dev/null +++ b/src/main.c @@ -0,0 +1,11 @@ +#include + +#include "event_loop.h" + +int main() { + setvbuf(stdout, NULL, _IONBF, 0); + + event_loop *el = event_loop_init(); + + event_loop_run(el, 8000); +} diff --git a/include/picohttpparser.h b/thirdparty/include/picohttpparser.h similarity index 100% rename from include/picohttpparser.h rename to thirdparty/include/picohttpparser.h diff --git a/src/picohttpparser.c b/thirdparty/src/picohttpparser.c similarity index 100% rename from src/picohttpparser.c rename to thirdparty/src/picohttpparser.c