#include "asyncengine_ruby.h" #include "ae_handle_common.h" #include "ae_utils.h" #include "ae_ip_utils.h" #include "ae_call_from_other_thread.h" #include "ae_next_tick.h" #include "ae_timer.h" #include "ae_udp.h" #include "ae_tcp.h" #include "ae_resolver.h" static VALUE mProcess; static VALUE mKernel; static VALUE AE_thread; static VALUE AE_pid; static VALUE AE_barrier; static ID att_handles; static ID att_procs; static ID att_next_tick_procs; static ID att_call_from_other_thread_procs; static ID att_user_error_handler; static ID att_exit_error; static ID att_on_exit_procs; static ID method_destroy; static ID method_clear; static ID method_next_tick; static ID method_call_from_other_thread; static ID method_pid; static ID method_raise; static ID method_call; static uv_async_t* ae_ubf_uv_async; /** Pre-declaration of static functions. */ static VALUE uv_run_without_gvl(void); static void ae_ubf(void); static void ae_ubf_uv_async_callback(uv_async_t* handle, int status); static void ae_release_loop(void); static int destroy_handle(VALUE key, VALUE handle, VALUE in); static VALUE destroy_handle_with_rb_protect(VALUE handle); static int ae_is_running_thread(void); static VALUE ae_handle_error_with_rb_protect(VALUE error); /** TODO: Temporal functions for debugging: libuv active handlers and requests counters. */ static int ae_uv_num_active_handlers(void) { AE_TRACE(); if (AE_status == AE_STOPPED) return 0; return AE_uv_loop->active_handles; } static VALUE AsyncEngine_num_uv_active_handles(VALUE self) { AE_TRACE(); return INT2FIX(ae_uv_num_active_handlers()); } static int ae_uv_num_active_reqs(void) { AE_TRACE(); if (AE_status == AE_STOPPED) return 0; ngx_queue_t *q; int count = 0; ngx_queue_foreach(q, &AE_uv_loop->active_reqs) count++; return count; } static VALUE AsyncEngine_num_uv_active_reqs(VALUE self) { AE_TRACE(); return INT2FIX(ae_uv_num_active_reqs()); } /** AE.run method. */ static VALUE AsyncEngine_run(int argc, VALUE *argv, VALUE self) { AE_TRACE(); VALUE proc, captured_error, on_exit_procs; int r, i; AE_RB_CHECK_NUM_ARGS(0,1); AE_RB_ENSURE_BLOCK_OR_PROC(1, proc); if (AE_status == AE_RUNNING && (rb_funcall2(mProcess, method_pid, 0, NULL) != AE_pid)) rb_raise(eAsyncEngineError, "cannot run AsyncEngine from a forked process while already running"); // If already running pass the Proc to the reactor and return true. if (AE_status == AE_RUNNING) { if (ae_is_running_thread()) rb_funcall2(mAsyncEngine, method_next_tick, 1, &proc); else rb_funcall2(mAsyncEngine, method_call_from_other_thread, 1, &proc); return Qtrue; } // Acquire the barrier lock. rb_barrier_wait(AE_barrier); /* Set the UV loop. */ AE_uv_loop = uv_default_loop(); // Mark current thread and PID. AE_thread = rb_thread_current(); AE_pid = rb_funcall2(mProcess, method_pid, 0, NULL); // Get the VALUEs for @_handles and @_procs (faster). AE_handles = rb_ivar_get(mAsyncEngine, att_handles); AE_procs = rb_ivar_get(mAsyncEngine, att_procs); AE_ASSERT(AE_status == AE_STOPPED); /* Load the UV idle (AE.next_tick) and UV async (AE.call_from_other_thread) now. */ load_ae_next_tick_uv_idle(); load_ae_call_from_other_thread(); /* Load the UV async for the ae_ubf() function. */ AE_ASSERT(ae_ubf_uv_async == NULL); // TODO: testing. ae_ubf_uv_async = ALLOC(uv_async_t); r = uv_async_init(AE_uv_loop, ae_ubf_uv_async, ae_ubf_uv_async_callback); AE_ASSERT(r == 0); /* Initial status. */ AE_status = AE_RUNNING; /* Pass the given Proc to the reactor via next_tick. */ rb_funcall2(mAsyncEngine, method_next_tick, 1, &proc); // TODO: for testing. AE_ASSERT(ae_uv_num_active_reqs() == 0); AE_DEBUG("UV loop starts..."); // uv_run() will block here until ae_release_loop() is called. rb_thread_call_without_gvl(uv_run_without_gvl, NULL, ae_ubf, NULL); AE_DEBUG("UV loop terminates"); // TODO: for testing. AE_ASSERT(ae_uv_num_active_handlers() == 0); AE_ASSERT(ae_uv_num_active_reqs() == 0); // Unset the AE_uv_loop, AE_thread and AE_pid. AE_uv_loop = NULL; AE_thread = Qnil; AE_pid = Qnil; // Get the captured error in @_exit_error (if any) and clean @_exit_error. captured_error = rb_ivar_get(mAsyncEngine, att_exit_error); rb_ivar_set(mAsyncEngine, att_exit_error, Qnil); // Now yes, set the status to AE_STOPPED. AE_status = AE_STOPPED; // Release the barrier lock. rb_barrier_release(AE_barrier); /* * Run the procs within @_on_exit_procs in order by passing the captured error * (is any) as argument. */ on_exit_procs = rb_ivar_get(mAsyncEngine, att_on_exit_procs); rb_ivar_set(mAsyncEngine, att_on_exit_procs, rb_ary_new()); for(i=0 ; i