feat: implement concurrent worker queue
parent
854e8c3809
commit
3490c2dd42
|
@ -1,6 +1,7 @@
|
||||||
#ifndef LNM_LOOP
|
#ifndef LNM_LOOP
|
||||||
#define LNM_LOOP
|
#define LNM_LOOP
|
||||||
|
|
||||||
|
#include <pthread.h>
|
||||||
#include <stdatomic.h>
|
#include <stdatomic.h>
|
||||||
#include <stdint.h>
|
#include <stdint.h>
|
||||||
#include <stdlib.h>
|
#include <stdlib.h>
|
||||||
|
@ -51,4 +52,35 @@ lnm_err lnm_loop_setup(lnm_loop *l, uint16_t port);
|
||||||
|
|
||||||
lnm_err lnm_loop_run(lnm_loop *l, int thread_count);
|
lnm_err lnm_loop_run(lnm_loop *l, int thread_count);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Concurrent fixed-size queue used by the worked threads to distribute tasks
|
||||||
|
*/
|
||||||
|
typedef struct lnm_loop_queue {
|
||||||
|
struct {
|
||||||
|
lnm_loop_conn **arr;
|
||||||
|
size_t len;
|
||||||
|
} buf;
|
||||||
|
size_t head;
|
||||||
|
size_t tail;
|
||||||
|
pthread_mutex_t mutex;
|
||||||
|
pthread_cond_t cond;
|
||||||
|
} lnm_loop_queue;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initialize a new queue with the specified fixed capacity
|
||||||
|
*/
|
||||||
|
lnm_err lnm_loop_queue_init(lnm_loop_queue **out, size_t cap);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Queue the given connection. If the queue is currently full, this action
|
||||||
|
* blocks until there is space available.
|
||||||
|
*/
|
||||||
|
void lnm_loop_queue_push(lnm_loop_queue *q, lnm_loop_conn *conn);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Pop a connection from the queue. This action blocks until a connection is
|
||||||
|
* available.
|
||||||
|
*/
|
||||||
|
lnm_loop_conn *lnm_loop_queue_pop(lnm_loop_queue *q);
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -0,0 +1,61 @@
|
||||||
|
#include "lnm/loop.h"
|
||||||
|
|
||||||
|
lnm_err lnm_loop_queue_init(lnm_loop_queue **out, size_t cap) {
|
||||||
|
lnm_loop_conn **arr = malloc(cap * sizeof(lnm_loop_conn *));
|
||||||
|
|
||||||
|
if (arr == NULL) {
|
||||||
|
return lnm_err_failed_alloc;
|
||||||
|
}
|
||||||
|
|
||||||
|
lnm_loop_queue *q = calloc(1, sizeof(lnm_loop_queue));
|
||||||
|
|
||||||
|
if (q == NULL) {
|
||||||
|
free(arr);
|
||||||
|
|
||||||
|
return lnm_err_failed_alloc;
|
||||||
|
}
|
||||||
|
|
||||||
|
q->buf.arr = arr;
|
||||||
|
q->buf.len = cap;
|
||||||
|
|
||||||
|
pthread_mutex_init(&q->mutex, NULL);
|
||||||
|
pthread_cond_init(&q->cond, NULL);
|
||||||
|
|
||||||
|
*out = q;
|
||||||
|
|
||||||
|
return lnm_err_ok;
|
||||||
|
}
|
||||||
|
|
||||||
|
void lnm_loop_queue_push(lnm_loop_queue *q, lnm_loop_conn *conn) {
|
||||||
|
pthread_mutex_lock(&q->mutex);
|
||||||
|
|
||||||
|
while (q->tail == q->head) {
|
||||||
|
pthread_cond_wait(&q->cond, &q->mutex);
|
||||||
|
}
|
||||||
|
|
||||||
|
q->buf.arr[q->tail] = conn;
|
||||||
|
|
||||||
|
// Make sure the index wraps around
|
||||||
|
q->tail = (q->tail + 1) % q->buf.len;
|
||||||
|
|
||||||
|
// Unlock mutex and signal to waiting threads
|
||||||
|
pthread_mutex_unlock(&q->mutex);
|
||||||
|
pthread_cond_signal(&q->cond);
|
||||||
|
}
|
||||||
|
|
||||||
|
lnm_loop_conn *lnm_loop_queue_pop(lnm_loop_queue *q) {
|
||||||
|
pthread_mutex_lock(&q->mutex);
|
||||||
|
|
||||||
|
while (q->tail == q->head) {
|
||||||
|
pthread_cond_wait(&q->cond, &q->mutex);
|
||||||
|
}
|
||||||
|
|
||||||
|
lnm_loop_conn *out = q->buf.arr[q->head];
|
||||||
|
q->head = (q->head + 1) % q->buf.len;
|
||||||
|
|
||||||
|
// Unlock mutex and signal to waiting threads
|
||||||
|
pthread_mutex_unlock(&q->mutex);
|
||||||
|
pthread_cond_signal(&q->cond);
|
||||||
|
|
||||||
|
return out;
|
||||||
|
}
|
Loading…
Reference in New Issue