feat: add blocking step support to http loop
parent
9c01548256
commit
a96ae39523
|
@ -34,5 +34,5 @@ int main() {
|
||||||
lnm_log_init_global();
|
lnm_log_init_global();
|
||||||
lnm_log_register_stdout(lnm_log_level_debug);
|
lnm_log_register_stdout(lnm_log_level_debug);
|
||||||
|
|
||||||
printf("res = %i\n", lnm_http_loop_run(hl, 8080, 1, 1));
|
printf("res = %i\n", lnm_http_loop_run(hl, 8080, 1, 32));
|
||||||
}
|
}
|
||||||
|
|
|
@ -47,6 +47,7 @@ typedef struct lnm_loop_queue {
|
||||||
} buf;
|
} buf;
|
||||||
size_t head;
|
size_t head;
|
||||||
size_t tail;
|
size_t tail;
|
||||||
|
bool empty;
|
||||||
pthread_mutex_t mutex;
|
pthread_mutex_t mutex;
|
||||||
pthread_cond_t cond;
|
pthread_cond_t cond;
|
||||||
} lnm_loop_queue;
|
} lnm_loop_queue;
|
||||||
|
|
|
@ -8,6 +8,7 @@
|
||||||
typedef struct lnm_http_step {
|
typedef struct lnm_http_step {
|
||||||
lnm_http_step_fn fn;
|
lnm_http_step_fn fn;
|
||||||
struct lnm_http_step *next;
|
struct lnm_http_step *next;
|
||||||
|
bool blocking;
|
||||||
} lnm_http_step;
|
} lnm_http_step;
|
||||||
|
|
||||||
typedef enum lnm_http_route_type {
|
typedef enum lnm_http_route_type {
|
||||||
|
|
|
@ -133,11 +133,19 @@ void lnm_http_loop_process_steps(lnm_http_conn *conn) {
|
||||||
while ((ctx->cur_step != NULL) && (step != ctx->cur_step)) {
|
while ((ctx->cur_step != NULL) && (step != ctx->cur_step)) {
|
||||||
step = ctx->cur_step;
|
step = ctx->cur_step;
|
||||||
|
|
||||||
|
if (step->blocking && (conn->state != lnm_loop_state_req_blocking)) {
|
||||||
|
conn->state = lnm_loop_state_req_blocking;
|
||||||
|
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
switch (step->fn(conn)) {
|
switch (step->fn(conn)) {
|
||||||
case lnm_http_step_err_done:
|
case lnm_http_step_err_done:
|
||||||
ctx->cur_step = ctx->cur_step->next;
|
ctx->cur_step = ctx->cur_step->next;
|
||||||
break;
|
break;
|
||||||
case lnm_http_step_err_io_needed:
|
case lnm_http_step_err_io_needed:
|
||||||
|
// Ensure steps that require more I/O are executed on the event loop
|
||||||
|
conn->state = lnm_loop_state_req;
|
||||||
break;
|
break;
|
||||||
case lnm_http_step_err_close:
|
case lnm_http_step_err_close:
|
||||||
conn->state = lnm_loop_state_end;
|
conn->state = lnm_loop_state_end;
|
||||||
|
@ -149,6 +157,7 @@ void lnm_http_loop_process_steps(lnm_http_conn *conn) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ctx->cur_step == NULL) {
|
if (ctx->cur_step == NULL) {
|
||||||
|
conn->state = lnm_loop_state_res;
|
||||||
ctx->state = lnm_http_loop_state_add_headers;
|
ctx->state = lnm_http_loop_state_add_headers;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,9 +1,10 @@
|
||||||
#include <sys/epoll.h>
|
#include <sys/epoll.h>
|
||||||
|
|
||||||
|
#include "lnm/log.h"
|
||||||
#include "lnm/loop.h"
|
#include "lnm/loop.h"
|
||||||
|
|
||||||
lnm_err lnm_loop_queue_init(lnm_loop_queue **out, size_t cap) {
|
lnm_err lnm_loop_queue_init(lnm_loop_queue **out, size_t cap) {
|
||||||
lnm_loop_conn **arr = malloc(cap * sizeof(lnm_loop_conn *));
|
lnm_loop_conn **arr = calloc(cap, sizeof(lnm_loop_conn *));
|
||||||
|
|
||||||
if (arr == NULL) {
|
if (arr == NULL) {
|
||||||
return lnm_err_failed_alloc;
|
return lnm_err_failed_alloc;
|
||||||
|
@ -20,6 +21,10 @@ lnm_err lnm_loop_queue_init(lnm_loop_queue **out, size_t cap) {
|
||||||
q->buf.arr = arr;
|
q->buf.arr = arr;
|
||||||
q->buf.len = cap;
|
q->buf.len = cap;
|
||||||
|
|
||||||
|
q->tail = 0;
|
||||||
|
q->head = 0;
|
||||||
|
q->empty = true;
|
||||||
|
|
||||||
pthread_mutex_init(&q->mutex, NULL);
|
pthread_mutex_init(&q->mutex, NULL);
|
||||||
pthread_cond_init(&q->cond, NULL);
|
pthread_cond_init(&q->cond, NULL);
|
||||||
|
|
||||||
|
@ -31,14 +36,15 @@ lnm_err lnm_loop_queue_init(lnm_loop_queue **out, size_t cap) {
|
||||||
void lnm_loop_queue_push(lnm_loop_queue *q, lnm_loop_conn *conn) {
|
void lnm_loop_queue_push(lnm_loop_queue *q, lnm_loop_conn *conn) {
|
||||||
pthread_mutex_lock(&q->mutex);
|
pthread_mutex_lock(&q->mutex);
|
||||||
|
|
||||||
while (q->tail == q->head) {
|
while (q->head == q->tail && !q->empty) {
|
||||||
pthread_cond_wait(&q->cond, &q->mutex);
|
pthread_cond_wait(&q->cond, &q->mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
q->buf.arr[q->tail] = conn;
|
q->buf.arr[q->head] = conn;
|
||||||
|
|
||||||
// Make sure the index wraps around
|
// Make sure the index wraps around
|
||||||
q->tail = (q->tail + 1) % q->buf.len;
|
q->head = (q->head + 1) % q->buf.len;
|
||||||
|
q->empty = false;
|
||||||
|
|
||||||
// Unlock mutex and signal to waiting threads
|
// Unlock mutex and signal to waiting threads
|
||||||
pthread_mutex_unlock(&q->mutex);
|
pthread_mutex_unlock(&q->mutex);
|
||||||
|
@ -48,12 +54,14 @@ void lnm_loop_queue_push(lnm_loop_queue *q, lnm_loop_conn *conn) {
|
||||||
lnm_loop_conn *lnm_loop_queue_pop(lnm_loop_queue *q) {
|
lnm_loop_conn *lnm_loop_queue_pop(lnm_loop_queue *q) {
|
||||||
pthread_mutex_lock(&q->mutex);
|
pthread_mutex_lock(&q->mutex);
|
||||||
|
|
||||||
while (q->tail == q->head) {
|
while (q->empty) {
|
||||||
pthread_cond_wait(&q->cond, &q->mutex);
|
pthread_cond_wait(&q->cond, &q->mutex);
|
||||||
}
|
}
|
||||||
|
|
||||||
lnm_loop_conn *out = q->buf.arr[q->head];
|
lnm_loop_conn *out = q->buf.arr[q->tail];
|
||||||
q->head = (q->head + 1) % q->buf.len;
|
|
||||||
|
q->tail = (q->tail + 1) % q->buf.len;
|
||||||
|
q->empty = q->tail == q->head;
|
||||||
|
|
||||||
// Unlock mutex and signal to waiting threads
|
// Unlock mutex and signal to waiting threads
|
||||||
pthread_mutex_unlock(&q->mutex);
|
pthread_mutex_unlock(&q->mutex);
|
||||||
|
@ -66,8 +74,17 @@ void lnm_loop_worker_run(void *arg) {
|
||||||
lnm_loop *l = arg;
|
lnm_loop *l = arg;
|
||||||
lnm_loop_queue *q = l->wq;
|
lnm_loop_queue *q = l->wq;
|
||||||
|
|
||||||
|
// Get thread ID by incrementing counter
|
||||||
|
pthread_mutex_lock(&l->threads.mutex);
|
||||||
|
|
||||||
|
int thread_id = l->threads.worker_count;
|
||||||
|
l->threads.worker_count++;
|
||||||
|
|
||||||
|
pthread_mutex_unlock(&l->threads.mutex);
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
lnm_loop_conn *conn = lnm_loop_queue_pop(q);
|
lnm_loop_conn *conn = lnm_loop_queue_pop(q);
|
||||||
|
lnm_ldebug("loop", "worker %i processing fd %i", thread_id, conn->fd);
|
||||||
|
|
||||||
switch (conn->state) {
|
switch (conn->state) {
|
||||||
case lnm_loop_state_req_blocking:
|
case lnm_loop_state_req_blocking:
|
||||||
|
|
Loading…
Reference in New Issue