diff --git a/example/blocking.c b/example/blocking.c index 557bc87..a145ed5 100644 --- a/example/blocking.c +++ b/example/blocking.c @@ -27,12 +27,12 @@ int main() { ctx_reset, ctx_free); - lnm_http_step_init(&step, slow_step); + lnm_http_step_append(&step, slow_step, true); lnm_http_route_init_literal(&route, lnm_http_method_get, "/", step); lnm_http_loop_route_add(hl, route); lnm_log_init_global(); lnm_log_register_stdout(lnm_log_level_debug); - printf("res = %i\n", lnm_http_loop_run(hl, 8080, 1, 32)); + printf("res = %i\n", lnm_http_loop_run(hl, 8080, 1, 2)); } diff --git a/include/lnm/http/loop.h b/include/lnm/http/loop.h index 2427179..fefdf6a 100644 --- a/include/lnm/http/loop.h +++ b/include/lnm/http/loop.h @@ -32,23 +32,16 @@ lnm_err lnm_http_loop_init(lnm_http_loop **out, void *c_gctx, lnm_http_ctx_reset_fn ctx_reset, lnm_http_ctx_free_fn ctx_free); -/** - * Initialize a new step. - * - * @param out where to store pointer to new `lnm_http_step` - * @param fn step function - */ -lnm_err lnm_http_step_init(lnm_http_step **out, lnm_http_step_fn fn); - /** * Append the given step fn to the step. * - * @param out where to store pointer to new `lnm_http_step` - * @param step step to append new step to + * @param out both the previous step to append the new step to, and the output + * variable to which the new step is appended * @param fn step function + * @param blocking whether the step is blocking or not */ -lnm_err lnm_http_step_append(lnm_http_step **out, lnm_http_step *step, - lnm_http_step_fn fn); +lnm_err lnm_http_step_append(lnm_http_step **out, lnm_http_step_fn fn, + bool blocking); /** * Initialize a new route of type literal. diff --git a/include/lnm/loop.h b/include/lnm/loop.h index 09041bc..b922c1f 100644 --- a/include/lnm/loop.h +++ b/include/lnm/loop.h @@ -106,6 +106,26 @@ lnm_err lnm_loop_run(lnm_loop *l); lnm_err lnm_loop_run_multi(lnm_loop *l, size_t epoll_threads, size_t worker_threads); +/** + * Advance the processing of the given connection. + * + * Behavior of this function depends on both the connection state and whether + * worker threads are enabled. + * + * For IO states, this function will perform network I/O along with executing + * the loop's respective processing functions. + * + * For work states, the respective processing functions are executed without + * performing any network I/O. If no worker queue is present, this function + * performs all blocking work until an I/O or the end state is reached. If there + * is a worker queue present, only one block of work is done before exiting, + * allowing further blocks of work to be scheduled on other worker threads. + * + * If no worker queue is present, this function will only exit once an I/O or + * end state is reached. + */ +void lnm_loop_conn_advance(lnm_loop *l, lnm_loop_conn *conn); + /** * Reschedule the given connection, either on the event loop for network IO or * on a worker thread for blocking work. Connections are terminated as needed. diff --git a/src/_include/lnm/loop_internal.h b/src/_include/lnm/loop_internal.h index a5e70a8..c71f303 100644 --- a/src/_include/lnm/loop_internal.h +++ b/src/_include/lnm/loop_internal.h @@ -9,6 +9,6 @@ void lnm_loop_conn_free(lnm_loop *l, lnm_loop_conn *conn); lnm_err lnm_loop_accept(lnm_loop *l); -void lnm_loop_conn_io(lnm_loop *l, lnm_loop_conn *conn); +void lnm_loop_conn_advance(lnm_loop *l, lnm_loop_conn *conn); #endif diff --git a/src/http/lnm_http_loop.c b/src/http/lnm_http_loop.c index 153b88c..bd83526 100644 --- a/src/http/lnm_http_loop.c +++ b/src/http/lnm_http_loop.c @@ -28,7 +28,8 @@ lnm_err lnm_http_loop_init(lnm_http_loop **out, void *c_gctx, return lnm_err_ok; } -lnm_err lnm_http_step_init(lnm_http_step **out, lnm_http_step_fn fn) { +lnm_err lnm_http_step_append(lnm_http_step **out, lnm_http_step_fn fn, + bool blocking) { lnm_http_step *step = calloc(1, sizeof(lnm_http_step)); if (step == NULL) { @@ -36,19 +37,14 @@ lnm_err lnm_http_step_init(lnm_http_step **out, lnm_http_step_fn fn) { } step->fn = fn; - *out = step; + step->blocking = blocking; - return lnm_err_ok; -} - -lnm_err lnm_http_step_append(lnm_http_step **out, lnm_http_step *step, - lnm_http_step_fn fn) { - LNM_RES(lnm_http_step_init(out, fn)); - - if (step != NULL) { - step->next = *out; + if ((*out) != NULL) { + (*out)->next = step; } + *out = step; + return lnm_err_ok; } diff --git a/src/loop/lnm_loop.c b/src/loop/lnm_loop.c index de74902..5154b1e 100644 --- a/src/loop/lnm_loop.c +++ b/src/loop/lnm_loop.c @@ -196,7 +196,7 @@ lnm_err lnm_loop_run(lnm_loop *l) { lnm_loop_conn *conn = events[i].data.ptr; // At this point, state is always an IO state - lnm_loop_conn_io(l, conn); + lnm_loop_conn_advance(l, conn); lnm_loop_conn_schedule(l, conn); } } diff --git a/src/loop/lnm_loop_io.c b/src/loop/lnm_loop_io.c index 15fba24..58f5577 100644 --- a/src/loop/lnm_loop_io.c +++ b/src/loop/lnm_loop_io.c @@ -67,14 +67,30 @@ void lnm_loop_conn_io_res(lnm_loop *l, lnm_loop_conn *conn) { } while (conn->state == lnm_loop_state_res_io); } -void lnm_loop_conn_io(lnm_loop *l, lnm_loop_conn *conn) { - switch (conn->state) { - case lnm_loop_state_req_io: - lnm_loop_conn_io_req(l, conn); - break; - case lnm_loop_state_res_io: - lnm_loop_conn_io_res(l, conn); - break; - default:; +void lnm_loop_conn_advance(lnm_loop *l, lnm_loop_conn *conn) { + do { + + switch (conn->state) { + case lnm_loop_state_req_io: + lnm_loop_conn_io_req(l, conn); + break; + case lnm_loop_state_res_io: + lnm_loop_conn_io_res(l, conn); + break; + case lnm_loop_state_req_work: + do { + l->data_read(conn); + } while (conn->state == lnm_loop_state_req_work); + break; + case lnm_loop_state_res_work: + do { + l->data_write(conn); + } while (conn->state == lnm_loop_state_res_work); + break; + default:; + } } + // Execute all blocking work if we're running in single-threaded mode + while (l->wq == NULL && (conn->state == lnm_loop_state_req_work || + conn->state == lnm_loop_state_res_work)); } diff --git a/src/loop/lnm_loop_worker.c b/src/loop/lnm_loop_worker.c index 6d44406..fba4848 100644 --- a/src/loop/lnm_loop_worker.c +++ b/src/loop/lnm_loop_worker.c @@ -86,16 +86,7 @@ void lnm_loop_worker_run(void *arg) { lnm_loop_conn *conn = lnm_loop_queue_pop(q); lnm_ldebug("loop", "worker %i processing fd %i", thread_id, conn->fd); - switch (conn->state) { - case lnm_loop_state_req_work: - l->data_read(conn); - break; - case lnm_loop_state_res_work: - l->data_write(conn); - // Other states shouldn't even end up here, so we ignore them - default:; - } - + lnm_loop_conn_advance(l, conn); lnm_loop_conn_schedule(l, conn); } }