diff --git a/include/lnm/loop.h b/include/lnm/loop.h index 7ca372c..4c33aa5 100644 --- a/include/lnm/loop.h +++ b/include/lnm/loop.h @@ -1,6 +1,7 @@ #ifndef LNM_LOOP #define LNM_LOOP +#include #include #include #include @@ -51,4 +52,35 @@ lnm_err lnm_loop_setup(lnm_loop *l, uint16_t port); lnm_err lnm_loop_run(lnm_loop *l, int thread_count); +/** + * Concurrent fixed-size queue used by the worked threads to distribute tasks + */ +typedef struct lnm_loop_queue { + struct { + lnm_loop_conn **arr; + size_t len; + } buf; + size_t head; + size_t tail; + pthread_mutex_t mutex; + pthread_cond_t cond; +} lnm_loop_queue; + +/** + * Initialize a new queue with the specified fixed capacity + */ +lnm_err lnm_loop_queue_init(lnm_loop_queue **out, size_t cap); + +/** + * Queue the given connection. If the queue is currently full, this action + * blocks until there is space available. + */ +void lnm_loop_queue_push(lnm_loop_queue *q, lnm_loop_conn *conn); + +/** + * Pop a connection from the queue. This action blocks until a connection is + * available. + */ +lnm_loop_conn *lnm_loop_queue_pop(lnm_loop_queue *q); + #endif diff --git a/src/loop/lnm_loop_worker.c b/src/loop/lnm_loop_worker.c new file mode 100644 index 0000000..067f1e8 --- /dev/null +++ b/src/loop/lnm_loop_worker.c @@ -0,0 +1,61 @@ +#include "lnm/loop.h" + +lnm_err lnm_loop_queue_init(lnm_loop_queue **out, size_t cap) { + lnm_loop_conn **arr = malloc(cap * sizeof(lnm_loop_conn *)); + + if (arr == NULL) { + return lnm_err_failed_alloc; + } + + lnm_loop_queue *q = calloc(1, sizeof(lnm_loop_queue)); + + if (q == NULL) { + free(arr); + + return lnm_err_failed_alloc; + } + + q->buf.arr = arr; + q->buf.len = cap; + + pthread_mutex_init(&q->mutex, NULL); + pthread_cond_init(&q->cond, NULL); + + *out = q; + + return lnm_err_ok; +} + +void lnm_loop_queue_push(lnm_loop_queue *q, lnm_loop_conn *conn) { + pthread_mutex_lock(&q->mutex); + + while (q->tail == q->head) { + pthread_cond_wait(&q->cond, &q->mutex); + } + + q->buf.arr[q->tail] = conn; + + // Make sure the index wraps around + q->tail = (q->tail + 1) % q->buf.len; + + // Unlock mutex and signal to waiting threads + pthread_mutex_unlock(&q->mutex); + pthread_cond_signal(&q->cond); +} + +lnm_loop_conn *lnm_loop_queue_pop(lnm_loop_queue *q) { + pthread_mutex_lock(&q->mutex); + + while (q->tail == q->head) { + pthread_cond_wait(&q->cond, &q->mutex); + } + + lnm_loop_conn *out = q->buf.arr[q->head]; + q->head = (q->head + 1) % q->buf.len; + + // Unlock mutex and signal to waiting threads + pthread_mutex_unlock(&q->mutex); + pthread_cond_signal(&q->cond); + + return out; +}