#include #include #include #include #include #include #include "lnm/common.h" #include "lnm/loop_internal.h" lnm_err lnm_loop_init(lnm_loop **out, void *gctx, lnm_err (*ctx_init)(void **out, void *gctx), void (*ctx_free)(void *ctx), void (*data_read)(lnm_loop_conn *conn), void (*data_write)(lnm_loop_conn *conn)) { lnm_loop *l = calloc(1, sizeof(lnm_loop)); if (l == NULL) { return lnm_err_failed_alloc; } l->gctx = gctx; l->ctx_init = ctx_init; l->ctx_free = ctx_free; l->data_read = data_read; l->data_write = data_write; *out = l; return lnm_err_ok; } lnm_err lnm_loop_accept(lnm_loop *l) { int conn_fd = accept(l->listen_fd, NULL, NULL); if (conn_fd < 0) { return lnm_err_failed_network; } // Set socket to non-blocking int flags = fcntl(conn_fd, F_GETFL); flags |= O_NONBLOCK; fcntl(conn_fd, F_SETFL, flags); // Append connection to list of connections if ((size_t)conn_fd >= l->conns.len) { // We always calloc as a realloc might introduce unitialized values in the // array lnm_loop_conn **new = calloc(sizeof(lnm_loop_conn *), conn_fd + 1); if (new == NULL) { close(conn_fd); return lnm_err_failed_alloc; } if (l->conns.len > 0) { memcpy(new, l->conns.arr, l->conns.len * sizeof(lnm_loop_conn *)); free(l->conns.arr); } l->conns.arr = new; l->conns.len = conn_fd + 1; } lnm_loop_conn *conn; LNM_RES2(lnm_loop_conn_init(&conn, l), close(conn_fd)); l->conns.arr[conn_fd] = conn; conn->fd = conn_fd; conn->state = lnm_loop_state_req; l->conns.open++; return lnm_err_ok; } lnm_err lnm_loop_setup(lnm_loop *l, uint16_t port) { int listen_fd = socket(AF_INET, SOCK_STREAM, 0); if (listen_fd < 0) { return lnm_err_failed_network; } int val = 1; int res = setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &val, sizeof(int)); if (res < 0) { return lnm_err_failed_network; } struct sockaddr_in addr = {.sin_family = AF_INET, .sin_port = ntohs(port), .sin_addr.s_addr = ntohl(0)}; res = bind(listen_fd, (const struct sockaddr *)&addr, sizeof(addr)); if (res < 0) { return lnm_err_failed_network; } res = listen(listen_fd, SOMAXCONN); if (res < 0) { return lnm_err_failed_network; } int flags = fcntl(listen_fd, F_GETFL); fcntl(listen_fd, F_SETFL, flags | O_NONBLOCK); l->listen_fd = listen_fd; return lnm_err_ok; } lnm_err lnm_loop_run(lnm_loop *l) { if (l->listen_fd == 0) { return lnm_err_not_setup; } struct pollfd *poll_args = calloc(LNM_LOOP_INITIAL_CONNS + 1, sizeof(struct pollfd)); size_t poll_args_cap = LNM_LOOP_INITIAL_CONNS + 1; if (poll_args == NULL) { return lnm_err_failed_alloc; } // First argument is listening socket poll_args[0].fd = l->listen_fd; poll_args[0].events = POLLIN; while (1) { nfds_t poll_args_len = 1; // Add all open connections to the poll command for (size_t i = 0; i < l->conns.len && poll_args_len < l->conns.open + 1; i++) { const lnm_loop_conn *conn = l->conns.arr[i]; if (conn == NULL) { continue; } poll_args[poll_args_len].fd = conn->fd; poll_args[poll_args_len].events = ((conn->state == lnm_loop_state_req) ? POLLIN : POLLOUT) | POLLERR; poll_args_len++; } int polled = poll(poll_args, poll_args_len, -1); if (polled < 0) { return lnm_err_failed_poll; } if (poll_args[0].revents) { lnm_loop_accept(l); polled--; } for (size_t i = 1; i < poll_args_len && polled > 0; i++) { if (poll_args[i].revents) { lnm_loop_conn *conn = l->conns.arr[poll_args[i].fd]; lnm_loop_conn_io(l, conn); if (conn->state == lnm_loop_state_end) { l->conns.arr[conn->fd] = NULL; close(conn->fd); l->conns.open--; lnm_loop_conn_free(l, conn); } polled--; } } if (poll_args_cap < l->conns.open + 1) { struct pollfd *buf = calloc(l->conns.open + 1, sizeof(struct pollfd)); if (buf == NULL) { return lnm_err_failed_alloc; } buf[0].fd = l->listen_fd; buf[0].events = POLLIN; free(poll_args); poll_args = buf; poll_args_cap = l->conns.open + 1; } } return lnm_err_ok; }