Compare commits
2 Commits
64601b5f21
...
1f63f06e0c
Author | SHA1 | Date |
---|---|---|
Jef Roosens | 1f63f06e0c | |
Jef Roosens | 1b8ba305b5 |
|
@ -27,12 +27,12 @@ int main() {
|
|||
ctx_reset,
|
||||
ctx_free);
|
||||
|
||||
lnm_http_step_init(&step, slow_step);
|
||||
lnm_http_step_append(&step, slow_step, true);
|
||||
lnm_http_route_init_literal(&route, lnm_http_method_get, "/", step);
|
||||
lnm_http_loop_route_add(hl, route);
|
||||
|
||||
lnm_log_init_global();
|
||||
lnm_log_register_stdout(lnm_log_level_debug);
|
||||
|
||||
printf("res = %i\n", lnm_http_loop_run(hl, 8080, 1, 32));
|
||||
printf("res = %i\n", lnm_http_loop_run(hl, 8080, 1, 2));
|
||||
}
|
||||
|
|
|
@ -32,23 +32,16 @@ lnm_err lnm_http_loop_init(lnm_http_loop **out, void *c_gctx,
|
|||
lnm_http_ctx_reset_fn ctx_reset,
|
||||
lnm_http_ctx_free_fn ctx_free);
|
||||
|
||||
/**
|
||||
* Initialize a new step.
|
||||
*
|
||||
* @param out where to store pointer to new `lnm_http_step`
|
||||
* @param fn step function
|
||||
*/
|
||||
lnm_err lnm_http_step_init(lnm_http_step **out, lnm_http_step_fn fn);
|
||||
|
||||
/**
|
||||
* Append the given step fn to the step.
|
||||
*
|
||||
* @param out where to store pointer to new `lnm_http_step`
|
||||
* @param step step to append new step to
|
||||
* @param out both the previous step to append the new step to, and the output
|
||||
* variable to which the new step is appended
|
||||
* @param fn step function
|
||||
* @param blocking whether the step is blocking or not
|
||||
*/
|
||||
lnm_err lnm_http_step_append(lnm_http_step **out, lnm_http_step *step,
|
||||
lnm_http_step_fn fn);
|
||||
lnm_err lnm_http_step_append(lnm_http_step **out, lnm_http_step_fn fn,
|
||||
bool blocking);
|
||||
|
||||
/**
|
||||
* Initialize a new route of type literal.
|
||||
|
|
|
@ -12,11 +12,11 @@
|
|||
#define LNM_QUEUE_MULTIPLIER 8
|
||||
|
||||
typedef enum lnm_loop_state {
|
||||
lnm_loop_state_req = 0,
|
||||
lnm_loop_state_res,
|
||||
lnm_loop_state_req_io = 0,
|
||||
lnm_loop_state_res_io,
|
||||
lnm_loop_state_end,
|
||||
lnm_loop_state_req_blocking,
|
||||
lnm_loop_state_res_blocking,
|
||||
lnm_loop_state_req_work,
|
||||
lnm_loop_state_res_work,
|
||||
} lnm_loop_state;
|
||||
|
||||
/**
|
||||
|
@ -106,6 +106,26 @@ lnm_err lnm_loop_run(lnm_loop *l);
|
|||
lnm_err lnm_loop_run_multi(lnm_loop *l, size_t epoll_threads,
|
||||
size_t worker_threads);
|
||||
|
||||
/**
|
||||
* Advance the processing of the given connection.
|
||||
*
|
||||
* Behavior of this function depends on both the connection state and whether
|
||||
* worker threads are enabled.
|
||||
*
|
||||
* For IO states, this function will perform network I/O along with executing
|
||||
* the loop's respective processing functions.
|
||||
*
|
||||
* For work states, the respective processing functions are executed without
|
||||
* performing any network I/O. If no worker queue is present, this function
|
||||
* performs all blocking work until an I/O or the end state is reached. If there
|
||||
* is a worker queue present, only one block of work is done before exiting,
|
||||
* allowing further blocks of work to be scheduled on other worker threads.
|
||||
*
|
||||
* If no worker queue is present, this function will only exit once an I/O or
|
||||
* end state is reached.
|
||||
*/
|
||||
void lnm_loop_conn_advance(lnm_loop *l, lnm_loop_conn *conn);
|
||||
|
||||
/**
|
||||
* Reschedule the given connection, either on the event loop for network IO or
|
||||
* on a worker thread for blocking work. Connections are terminated as needed.
|
||||
|
|
|
@ -9,6 +9,6 @@ void lnm_loop_conn_free(lnm_loop *l, lnm_loop_conn *conn);
|
|||
|
||||
lnm_err lnm_loop_accept(lnm_loop *l);
|
||||
|
||||
void lnm_loop_conn_io(lnm_loop *l, lnm_loop_conn *conn);
|
||||
void lnm_loop_conn_advance(lnm_loop *l, lnm_loop_conn *conn);
|
||||
|
||||
#endif
|
||||
|
|
|
@ -28,7 +28,8 @@ lnm_err lnm_http_loop_init(lnm_http_loop **out, void *c_gctx,
|
|||
return lnm_err_ok;
|
||||
}
|
||||
|
||||
lnm_err lnm_http_step_init(lnm_http_step **out, lnm_http_step_fn fn) {
|
||||
lnm_err lnm_http_step_append(lnm_http_step **out, lnm_http_step_fn fn,
|
||||
bool blocking) {
|
||||
lnm_http_step *step = calloc(1, sizeof(lnm_http_step));
|
||||
|
||||
if (step == NULL) {
|
||||
|
@ -36,19 +37,14 @@ lnm_err lnm_http_step_init(lnm_http_step **out, lnm_http_step_fn fn) {
|
|||
}
|
||||
|
||||
step->fn = fn;
|
||||
*out = step;
|
||||
step->blocking = blocking;
|
||||
|
||||
return lnm_err_ok;
|
||||
}
|
||||
|
||||
lnm_err lnm_http_step_append(lnm_http_step **out, lnm_http_step *step,
|
||||
lnm_http_step_fn fn) {
|
||||
LNM_RES(lnm_http_step_init(out, fn));
|
||||
|
||||
if (step != NULL) {
|
||||
step->next = *out;
|
||||
if ((*out) != NULL) {
|
||||
(*out)->next = step;
|
||||
}
|
||||
|
||||
*out = step;
|
||||
|
||||
return lnm_err_ok;
|
||||
}
|
||||
|
||||
|
|
|
@ -133,8 +133,8 @@ void lnm_http_loop_process_steps(lnm_http_conn *conn) {
|
|||
while ((ctx->cur_step != NULL) && (step != ctx->cur_step)) {
|
||||
step = ctx->cur_step;
|
||||
|
||||
if (step->blocking && (conn->state != lnm_loop_state_req_blocking)) {
|
||||
conn->state = lnm_loop_state_req_blocking;
|
||||
if (step->blocking && (conn->state != lnm_loop_state_req_work)) {
|
||||
conn->state = lnm_loop_state_req_work;
|
||||
|
||||
break;
|
||||
}
|
||||
|
@ -145,7 +145,7 @@ void lnm_http_loop_process_steps(lnm_http_conn *conn) {
|
|||
break;
|
||||
case lnm_http_step_err_io_needed:
|
||||
// Ensure steps that require more I/O are executed on the event loop
|
||||
conn->state = lnm_loop_state_req;
|
||||
conn->state = lnm_loop_state_req_io;
|
||||
break;
|
||||
case lnm_http_step_err_close:
|
||||
conn->state = lnm_loop_state_end;
|
||||
|
@ -157,7 +157,7 @@ void lnm_http_loop_process_steps(lnm_http_conn *conn) {
|
|||
}
|
||||
|
||||
if (ctx->cur_step == NULL) {
|
||||
conn->state = lnm_loop_state_res;
|
||||
conn->state = lnm_loop_state_res_io;
|
||||
ctx->state = lnm_http_loop_state_add_headers;
|
||||
}
|
||||
}
|
||||
|
@ -334,23 +334,23 @@ void (*process_fns[])(lnm_http_conn *conn) = {
|
|||
|
||||
lnm_loop_state state_map[] = {
|
||||
// parse_req
|
||||
lnm_loop_state_req,
|
||||
lnm_loop_state_req_io,
|
||||
// route
|
||||
lnm_loop_state_req,
|
||||
lnm_loop_state_req_io,
|
||||
// parse_headers
|
||||
lnm_loop_state_req,
|
||||
lnm_loop_state_req_io,
|
||||
// steps
|
||||
lnm_loop_state_req,
|
||||
lnm_loop_state_req_io,
|
||||
// add_headers
|
||||
lnm_loop_state_req,
|
||||
lnm_loop_state_req_io,
|
||||
// write_status_line
|
||||
lnm_loop_state_res,
|
||||
lnm_loop_state_res_io,
|
||||
// write_headers
|
||||
lnm_loop_state_res,
|
||||
lnm_loop_state_res_io,
|
||||
// write_body
|
||||
lnm_loop_state_res,
|
||||
lnm_loop_state_res_io,
|
||||
// finish
|
||||
lnm_loop_state_res,
|
||||
lnm_loop_state_res_io,
|
||||
};
|
||||
|
||||
void lnm_http_loop_process(lnm_http_conn *conn) {
|
||||
|
@ -378,7 +378,7 @@ void lnm_http_loop_process(lnm_http_conn *conn) {
|
|||
|
||||
// We move the request to a dedicated buffer if the read buffer needs to be
|
||||
// reused
|
||||
if ((conn->state == lnm_loop_state_req) && (conn->state == loop_state) &&
|
||||
if ((conn->state == lnm_loop_state_req_io) && (conn->state == loop_state) &&
|
||||
(!ctx->req.buf.owned) && (ctx->req.buf.len > 0)) {
|
||||
char *buf = malloc(ctx->req.buf.len);
|
||||
|
||||
|
|
|
@ -55,7 +55,7 @@ lnm_err lnm_loop_accept(lnm_loop *l) {
|
|||
LNM_RES2(lnm_loop_conn_init(&conn, l), close(conn_fd));
|
||||
|
||||
conn->fd = conn_fd;
|
||||
conn->state = lnm_loop_state_req;
|
||||
conn->state = lnm_loop_state_req_io;
|
||||
|
||||
struct epoll_event event = {.data.ptr = conn,
|
||||
.events = EPOLLIN | EPOLLET | EPOLLONESHOT};
|
||||
|
@ -132,17 +132,17 @@ lnm_err lnm_loop_setup(lnm_loop *l, uint16_t port) {
|
|||
void lnm_loop_conn_schedule(lnm_loop *l, lnm_loop_conn *conn) {
|
||||
switch (conn->state) {
|
||||
// IO states get rescheduled in the epoll loop
|
||||
case lnm_loop_state_req:
|
||||
case lnm_loop_state_res: {
|
||||
case lnm_loop_state_req_io:
|
||||
case lnm_loop_state_res_io: {
|
||||
struct epoll_event event = {
|
||||
.data.ptr = conn,
|
||||
.events = (conn->state == lnm_loop_state_req ? EPOLLIN : EPOLLOUT) |
|
||||
.events = (conn->state == lnm_loop_state_req_io ? EPOLLIN : EPOLLOUT) |
|
||||
EPOLLET | EPOLLONESHOT};
|
||||
|
||||
epoll_ctl(l->epoll_fd, EPOLL_CTL_MOD, conn->fd, &event);
|
||||
} break;
|
||||
case lnm_loop_state_req_blocking:
|
||||
case lnm_loop_state_res_blocking:
|
||||
case lnm_loop_state_req_work:
|
||||
case lnm_loop_state_res_work:
|
||||
lnm_loop_queue_push(l->wq, conn);
|
||||
break;
|
||||
case lnm_loop_state_end: {
|
||||
|
@ -196,7 +196,7 @@ lnm_err lnm_loop_run(lnm_loop *l) {
|
|||
lnm_loop_conn *conn = events[i].data.ptr;
|
||||
|
||||
// At this point, state is always an IO state
|
||||
lnm_loop_conn_io(l, conn);
|
||||
lnm_loop_conn_advance(l, conn);
|
||||
lnm_loop_conn_schedule(l, conn);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
#include <errno.h>
|
||||
#include <string.h>
|
||||
#include <unistd.h>
|
||||
#include <sys/socket.h>
|
||||
#include <unistd.h>
|
||||
|
||||
#include "lnm/loop.h"
|
||||
#include "lnm/loop_internal.h"
|
||||
|
@ -34,7 +34,7 @@ void lnm_loop_conn_io_req(lnm_loop *l, lnm_loop_conn *conn) {
|
|||
|
||||
conn->r.size += res;
|
||||
l->data_read(conn);
|
||||
} while (conn->state == lnm_loop_state_req);
|
||||
} while (conn->state == lnm_loop_state_req_io);
|
||||
}
|
||||
|
||||
void lnm_loop_conn_io_res(lnm_loop *l, lnm_loop_conn *conn) {
|
||||
|
@ -64,17 +64,33 @@ void lnm_loop_conn_io_res(lnm_loop *l, lnm_loop_conn *conn) {
|
|||
// writer function more space to work with
|
||||
memmove(conn->w.buf, &conn->w.buf[res], conn->w.size - res);
|
||||
conn->w.size -= res;
|
||||
} while (conn->state == lnm_loop_state_res);
|
||||
} while (conn->state == lnm_loop_state_res_io);
|
||||
}
|
||||
|
||||
void lnm_loop_conn_io(lnm_loop *l, lnm_loop_conn *conn) {
|
||||
switch (conn->state) {
|
||||
case lnm_loop_state_req:
|
||||
lnm_loop_conn_io_req(l, conn);
|
||||
break;
|
||||
case lnm_loop_state_res:
|
||||
lnm_loop_conn_io_res(l, conn);
|
||||
break;
|
||||
default:;
|
||||
void lnm_loop_conn_advance(lnm_loop *l, lnm_loop_conn *conn) {
|
||||
do {
|
||||
|
||||
switch (conn->state) {
|
||||
case lnm_loop_state_req_io:
|
||||
lnm_loop_conn_io_req(l, conn);
|
||||
break;
|
||||
case lnm_loop_state_res_io:
|
||||
lnm_loop_conn_io_res(l, conn);
|
||||
break;
|
||||
case lnm_loop_state_req_work:
|
||||
do {
|
||||
l->data_read(conn);
|
||||
} while (conn->state == lnm_loop_state_req_work);
|
||||
break;
|
||||
case lnm_loop_state_res_work:
|
||||
do {
|
||||
l->data_write(conn);
|
||||
} while (conn->state == lnm_loop_state_res_work);
|
||||
break;
|
||||
default:;
|
||||
}
|
||||
}
|
||||
// Execute all blocking work if we're running in single-threaded mode
|
||||
while (l->wq == NULL && (conn->state == lnm_loop_state_req_work ||
|
||||
conn->state == lnm_loop_state_res_work));
|
||||
}
|
||||
|
|
|
@ -86,16 +86,7 @@ void lnm_loop_worker_run(void *arg) {
|
|||
lnm_loop_conn *conn = lnm_loop_queue_pop(q);
|
||||
lnm_ldebug("loop", "worker %i processing fd %i", thread_id, conn->fd);
|
||||
|
||||
switch (conn->state) {
|
||||
case lnm_loop_state_req_blocking:
|
||||
l->data_read(conn);
|
||||
break;
|
||||
case lnm_loop_state_res_blocking:
|
||||
l->data_write(conn);
|
||||
// Other states shouldn't even end up here, so we ignore them
|
||||
default:;
|
||||
}
|
||||
|
||||
lnm_loop_conn_advance(l, conn);
|
||||
lnm_loop_conn_schedule(l, conn);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue