ext/libuv/src/unix/threadpool.c in uvrb-0.1.4 vs ext/libuv/src/unix/threadpool.c in uvrb-0.2.0
- old
+ new
@@ -28,12 +28,12 @@
static uv_cond_t cond;
static uv_mutex_t mutex;
static unsigned int nthreads;
static uv_thread_t* threads;
static uv_thread_t default_threads[4];
-static ngx_queue_t exit_message;
-static ngx_queue_t wq;
+static QUEUE exit_message;
+static QUEUE wq;
static volatile int initialized;
static void uv__cancelled(struct uv__work* w) {
abort();
@@ -43,51 +43,51 @@
/* To avoid deadlock with uv_cancel() it's crucial that the worker
* never holds the global mutex and the loop-local mutex at the same time.
*/
static void worker(void* arg) {
struct uv__work* w;
- ngx_queue_t* q;
+ QUEUE* q;
(void) arg;
for (;;) {
uv_mutex_lock(&mutex);
- while (ngx_queue_empty(&wq))
+ while (QUEUE_EMPTY(&wq))
uv_cond_wait(&cond, &mutex);
- q = ngx_queue_head(&wq);
+ q = QUEUE_HEAD(&wq);
if (q == &exit_message)
uv_cond_signal(&cond);
else {
- ngx_queue_remove(q);
- ngx_queue_init(q); /* Signal uv_cancel() that the work req is
+ QUEUE_REMOVE(q);
+ QUEUE_INIT(q); /* Signal uv_cancel() that the work req is
executing. */
}
uv_mutex_unlock(&mutex);
if (q == &exit_message)
break;
- w = ngx_queue_data(q, struct uv__work, wq);
+ w = QUEUE_DATA(q, struct uv__work, wq);
w->work(w);
uv_mutex_lock(&w->loop->wq_mutex);
w->work = NULL; /* Signal uv_cancel() that the work req is done
executing. */
- ngx_queue_insert_tail(&w->loop->wq, &w->wq);
+ QUEUE_INSERT_TAIL(&w->loop->wq, &w->wq);
uv_async_send(&w->loop->wq_async);
uv_mutex_unlock(&w->loop->wq_mutex);
}
}
-static void post(ngx_queue_t* q) {
+static void post(QUEUE* q) {
uv_mutex_lock(&mutex);
- ngx_queue_insert_tail(&wq, q);
+ QUEUE_INSERT_TAIL(&wq, q);
uv_cond_signal(&cond);
uv_mutex_unlock(&mutex);
}
@@ -117,23 +117,21 @@
abort();
if (uv_mutex_init(&mutex))
abort();
- ngx_queue_init(&wq);
+ QUEUE_INIT(&wq);
for (i = 0; i < nthreads; i++)
if (uv_thread_create(threads + i, worker, NULL))
abort();
initialized = 1;
}
-#if defined(__GNUC__)
-__attribute__((destructor))
-static void cleanup(void) {
+UV_DESTRUCTOR(static void cleanup(void)) {
unsigned int i;
if (initialized == 0)
return;
@@ -151,11 +149,10 @@
threads = NULL;
nthreads = 0;
initialized = 0;
}
-#endif
void uv__work_submit(uv_loop_t* loop,
struct uv__work* w,
void (*work)(struct uv__work* w),
@@ -172,53 +169,53 @@
int cancelled;
uv_mutex_lock(&mutex);
uv_mutex_lock(&w->loop->wq_mutex);
- cancelled = !ngx_queue_empty(&w->wq) && w->work != NULL;
+ cancelled = !QUEUE_EMPTY(&w->wq) && w->work != NULL;
if (cancelled)
- ngx_queue_remove(&w->wq);
+ QUEUE_REMOVE(&w->wq);
uv_mutex_unlock(&w->loop->wq_mutex);
uv_mutex_unlock(&mutex);
if (!cancelled)
- return -1;
+ return -EBUSY;
w->work = uv__cancelled;
uv_mutex_lock(&loop->wq_mutex);
- ngx_queue_insert_tail(&loop->wq, &w->wq);
+ QUEUE_INSERT_TAIL(&loop->wq, &w->wq);
uv_async_send(&loop->wq_async);
uv_mutex_unlock(&loop->wq_mutex);
return 0;
}
void uv__work_done(uv_async_t* handle, int status) {
struct uv__work* w;
uv_loop_t* loop;
- ngx_queue_t* q;
- ngx_queue_t wq;
+ QUEUE* q;
+ QUEUE wq;
int err;
loop = container_of(handle, uv_loop_t, wq_async);
- ngx_queue_init(&wq);
+ QUEUE_INIT(&wq);
uv_mutex_lock(&loop->wq_mutex);
- if (!ngx_queue_empty(&loop->wq)) {
- q = ngx_queue_head(&loop->wq);
- ngx_queue_split(&loop->wq, q, &wq);
+ if (!QUEUE_EMPTY(&loop->wq)) {
+ q = QUEUE_HEAD(&loop->wq);
+ QUEUE_SPLIT(&loop->wq, q, &wq);
}
uv_mutex_unlock(&loop->wq_mutex);
- while (!ngx_queue_empty(&wq)) {
- q = ngx_queue_head(&wq);
- ngx_queue_remove(q);
+ while (!QUEUE_EMPTY(&wq)) {
+ q = QUEUE_HEAD(&wq);
+ QUEUE_REMOVE(q);
w = container_of(q, struct uv__work, wq);
- err = (w->work == uv__cancelled) ? -UV_ECANCELED : 0;
+ err = (w->work == uv__cancelled) ? -ECANCELED : 0;
w->done(w, err);
}
}
@@ -227,32 +224,29 @@
req->work_cb(req);
}
-static void uv__queue_done(struct uv__work* w, int status) {
+static void uv__queue_done(struct uv__work* w, int err) {
uv_work_t* req;
req = container_of(w, uv_work_t, work_req);
uv__req_unregister(req->loop, req);
if (req->after_work_cb == NULL)
return;
- if (status == -UV_ECANCELED)
- uv__set_artificial_error(req->loop, UV_ECANCELED);
-
- req->after_work_cb(req, status ? -1 : 0);
+ req->after_work_cb(req, err);
}
int uv_queue_work(uv_loop_t* loop,
uv_work_t* req,
uv_work_cb work_cb,
uv_after_work_cb after_work_cb) {
if (work_cb == NULL)
- return uv__set_artificial_error(loop, UV_EINVAL);
+ return -EINVAL;
uv__req_init(loop, req, UV_WORK);
req->loop = loop;
req->work_cb = work_cb;
req->after_work_cb = after_work_cb;
@@ -277,10 +271,10 @@
case UV_WORK:
loop = ((uv_work_t*) req)->loop;
wreq = &((uv_work_t*) req)->work_req;
break;
default:
- return -1;
+ return -EINVAL;
}
return uv__work_cancel(loop, req, wreq);
}