fix: transparently support blocking work without worker threads

blocking
Jef Roosens 2024-02-14 12:43:58 +01:00
parent 1b8ba305b5
commit 1f63f06e0c
Signed by: Jef Roosens
GPG Key ID: B75D4F293C7052DB
8 changed files with 62 additions and 46 deletions

View File

@ -27,12 +27,12 @@ int main() {
ctx_reset, ctx_reset,
ctx_free); ctx_free);
lnm_http_step_init(&step, slow_step); lnm_http_step_append(&step, slow_step, true);
lnm_http_route_init_literal(&route, lnm_http_method_get, "/", step); lnm_http_route_init_literal(&route, lnm_http_method_get, "/", step);
lnm_http_loop_route_add(hl, route); lnm_http_loop_route_add(hl, route);
lnm_log_init_global(); lnm_log_init_global();
lnm_log_register_stdout(lnm_log_level_debug); lnm_log_register_stdout(lnm_log_level_debug);
printf("res = %i\n", lnm_http_loop_run(hl, 8080, 1, 32)); printf("res = %i\n", lnm_http_loop_run(hl, 8080, 1, 2));
} }

View File

@ -32,23 +32,16 @@ lnm_err lnm_http_loop_init(lnm_http_loop **out, void *c_gctx,
lnm_http_ctx_reset_fn ctx_reset, lnm_http_ctx_reset_fn ctx_reset,
lnm_http_ctx_free_fn ctx_free); lnm_http_ctx_free_fn ctx_free);
/**
* Initialize a new step.
*
* @param out where to store pointer to new `lnm_http_step`
* @param fn step function
*/
lnm_err lnm_http_step_init(lnm_http_step **out, lnm_http_step_fn fn);
/** /**
* Append the given step fn to the step. * Append the given step fn to the step.
* *
* @param out where to store pointer to new `lnm_http_step` * @param out both the previous step to append the new step to, and the output
* @param step step to append new step to * variable to which the new step is appended
* @param fn step function * @param fn step function
* @param blocking whether the step is blocking or not
*/ */
lnm_err lnm_http_step_append(lnm_http_step **out, lnm_http_step *step, lnm_err lnm_http_step_append(lnm_http_step **out, lnm_http_step_fn fn,
lnm_http_step_fn fn); bool blocking);
/** /**
* Initialize a new route of type literal. * Initialize a new route of type literal.

View File

@ -106,6 +106,26 @@ lnm_err lnm_loop_run(lnm_loop *l);
lnm_err lnm_loop_run_multi(lnm_loop *l, size_t epoll_threads, lnm_err lnm_loop_run_multi(lnm_loop *l, size_t epoll_threads,
size_t worker_threads); size_t worker_threads);
/**
* Advance the processing of the given connection.
*
* Behavior of this function depends on both the connection state and whether
* worker threads are enabled.
*
* For IO states, this function will perform network I/O along with executing
* the loop's respective processing functions.
*
* For work states, the respective processing functions are executed without
* performing any network I/O. If no worker queue is present, this function
* performs all blocking work until an I/O or the end state is reached. If there
* is a worker queue present, only one block of work is done before exiting,
* allowing further blocks of work to be scheduled on other worker threads.
*
* If no worker queue is present, this function will only exit once an I/O or
* end state is reached.
*/
void lnm_loop_conn_advance(lnm_loop *l, lnm_loop_conn *conn);
/** /**
* Reschedule the given connection, either on the event loop for network IO or * Reschedule the given connection, either on the event loop for network IO or
* on a worker thread for blocking work. Connections are terminated as needed. * on a worker thread for blocking work. Connections are terminated as needed.

View File

@ -9,6 +9,6 @@ void lnm_loop_conn_free(lnm_loop *l, lnm_loop_conn *conn);
lnm_err lnm_loop_accept(lnm_loop *l); lnm_err lnm_loop_accept(lnm_loop *l);
void lnm_loop_conn_io(lnm_loop *l, lnm_loop_conn *conn); void lnm_loop_conn_advance(lnm_loop *l, lnm_loop_conn *conn);
#endif #endif

View File

@ -28,7 +28,8 @@ lnm_err lnm_http_loop_init(lnm_http_loop **out, void *c_gctx,
return lnm_err_ok; return lnm_err_ok;
} }
lnm_err lnm_http_step_init(lnm_http_step **out, lnm_http_step_fn fn) { lnm_err lnm_http_step_append(lnm_http_step **out, lnm_http_step_fn fn,
bool blocking) {
lnm_http_step *step = calloc(1, sizeof(lnm_http_step)); lnm_http_step *step = calloc(1, sizeof(lnm_http_step));
if (step == NULL) { if (step == NULL) {
@ -36,22 +37,17 @@ lnm_err lnm_http_step_init(lnm_http_step **out, lnm_http_step_fn fn) {
} }
step->fn = fn; step->fn = fn;
step->blocking = blocking;
if ((*out) != NULL) {
(*out)->next = step;
}
*out = step; *out = step;
return lnm_err_ok; return lnm_err_ok;
} }
lnm_err lnm_http_step_append(lnm_http_step **out, lnm_http_step *step,
lnm_http_step_fn fn) {
LNM_RES(lnm_http_step_init(out, fn));
if (step != NULL) {
step->next = *out;
}
return lnm_err_ok;
}
lnm_err lnm_http_route_init(lnm_http_route **out) { lnm_err lnm_http_route_init(lnm_http_route **out) {
lnm_http_route *route = calloc(1, sizeof(lnm_http_route)); lnm_http_route *route = calloc(1, sizeof(lnm_http_route));

View File

@ -196,7 +196,7 @@ lnm_err lnm_loop_run(lnm_loop *l) {
lnm_loop_conn *conn = events[i].data.ptr; lnm_loop_conn *conn = events[i].data.ptr;
// At this point, state is always an IO state // At this point, state is always an IO state
lnm_loop_conn_io(l, conn); lnm_loop_conn_advance(l, conn);
lnm_loop_conn_schedule(l, conn); lnm_loop_conn_schedule(l, conn);
} }
} }

View File

@ -67,7 +67,9 @@ void lnm_loop_conn_io_res(lnm_loop *l, lnm_loop_conn *conn) {
} while (conn->state == lnm_loop_state_res_io); } while (conn->state == lnm_loop_state_res_io);
} }
void lnm_loop_conn_io(lnm_loop *l, lnm_loop_conn *conn) { void lnm_loop_conn_advance(lnm_loop *l, lnm_loop_conn *conn) {
do {
switch (conn->state) { switch (conn->state) {
case lnm_loop_state_req_io: case lnm_loop_state_req_io:
lnm_loop_conn_io_req(l, conn); lnm_loop_conn_io_req(l, conn);
@ -75,6 +77,20 @@ void lnm_loop_conn_io(lnm_loop *l, lnm_loop_conn *conn) {
case lnm_loop_state_res_io: case lnm_loop_state_res_io:
lnm_loop_conn_io_res(l, conn); lnm_loop_conn_io_res(l, conn);
break; break;
case lnm_loop_state_req_work:
do {
l->data_read(conn);
} while (conn->state == lnm_loop_state_req_work);
break;
case lnm_loop_state_res_work:
do {
l->data_write(conn);
} while (conn->state == lnm_loop_state_res_work);
break;
default:; default:;
} }
} }
// Execute all blocking work if we're running in single-threaded mode
while (l->wq == NULL && (conn->state == lnm_loop_state_req_work ||
conn->state == lnm_loop_state_res_work));
}

View File

@ -86,16 +86,7 @@ void lnm_loop_worker_run(void *arg) {
lnm_loop_conn *conn = lnm_loop_queue_pop(q); lnm_loop_conn *conn = lnm_loop_queue_pop(q);
lnm_ldebug("loop", "worker %i processing fd %i", thread_id, conn->fd); lnm_ldebug("loop", "worker %i processing fd %i", thread_id, conn->fd);
switch (conn->state) { lnm_loop_conn_advance(l, conn);
case lnm_loop_state_req_work:
l->data_read(conn);
break;
case lnm_loop_state_res_work:
l->data_write(conn);
// Other states shouldn't even end up here, so we ignore them
default:;
}
lnm_loop_conn_schedule(l, conn); lnm_loop_conn_schedule(l, conn);
} }
} }