ext/revdispatch/libdispatch-0.1/src/ev_dispatch.cc in evdispatch-0.2.2 vs ext/revdispatch/libdispatch-0.1/src/ev_dispatch.cc in evdispatch-0.2.4

- old
+ new

@@ -7,11 +7,11 @@ #ifdef DEBUG #include <assert.h> #define ASSERT_IS_D_THRAD\ pthread_t tid = pthread_self();\ - assert( tid == this->m_tid ); + assert( pthread_equal( tid, this->m_tid ) ); #else #define ASSERT_IS_D_THRAD #endif // --- @@ -21,10 +21,22 @@ // memcached: client library for the memcached requests // namespace EVD { +int Timer::current_time( struct timeval *tv) +{ +#if HAVE_CLOCK_GETTIME + struct timespec ts; + clock_gettime (CLOCK_REALTIME, &ts); + tv->tv_sec = ts.tv_sec; + tv->tv_usec = (long)(ts.tv_nsec * 0.0001); +#else + gettimeofday (tv, 0); +#endif +} + double Timer::elapsed_time( struct timeval *y ) { struct timeval duration; struct timeval now; current_time( &now ); @@ -55,10 +67,40 @@ if( x->tv_sec < y->tv_sec ){ return (-1.0 * elasped); } return elasped; } +template <typename T> +T *Queue<T>::pop_or_wait( POP_STATE *rstate, volatile bool &cond, Timer timer ) +{ + Guard lock(m_lock); + T *req = NULL; + size_t size = m_queue.size(); + if( size > 0 ){ + req = m_queue.front(); + } + while( !req ) { + timer.update(); + m_cond.timed_wait( m_lock, timer ); + if( !cond ){ *rstate = EXITING; break; } + size = m_queue.size(); + if( size > 0 ){ + req = m_queue.front(); + break; + } + else { + *rstate = EXPIRED; + req = NULL; + break; + } + } + if( req ){ + *rstate = POPPED; + m_queue.pop(); + } + return req; +} Dispatch::Dispatch() : m_loop(NULL), m_counter(0), m_loop_started(false), m_http_client(NULL), m_pending(0) { // zero everything out memset(&m_clock,0,sizeof(ev_timer)); @@ -80,11 +122,11 @@ int rc; if( m_loop_started ){ return false; } rc = pthread_create( &m_tid, NULL, Dispatch::event_loop_start, this ); if( rc ){ return false; } - pthread_detach( m_tid ); + //pthread_detach( m_tid ); m_http_client = new HttpClient(this); // block until we get notified that the event thead is up and running Guard lock(m_lock); @@ -115,17 +157,34 @@ // tell the background event listener to terminate void Dispatch::stop() { if( !m_loop_started ){ return ; } - //printf( "EVD::Dispatch stopping...\n" ); + printf( "EVD::Dispatch stopping...\n" ); + m_lock.lock(); m_loop_started = false; - m_cond.broadcast(); m_requests.signal(); m_responses.signal(); - ev_unloop( m_loop, EVUNLOOP_ALL ); + m_lock.unlock(); + + // send the message to unloop all + { + Guard g(m_lock); + do { + // unloop again + printf( "main loop is waiting on the event loop to go down\n" ); + ev_unloop( m_loop, EVUNLOOP_ALL ); + if( m_loop_down ) { break; } + ev_unref( m_loop ); // decrement the ref count + // sleep until the event loop tells us it's done + m_cond.timed_wait(m_lock,Timer(1,0)); // wait until we're done + }while( !m_loop_down ); + } + + printf( "okay, we're joining the thread\n" ); + pthread_join( m_tid, NULL ); if( m_http_client ) { delete m_http_client; m_http_client = NULL; } } @@ -154,69 +213,71 @@ } void Dispatch::request_cb( struct ev_loop *loop, struct ev_async *w, int revents ) { ASSERT_IS_D_THRAD + // lock and read as much as we can out of the queue + m_requests.m_lock.lock(); // create a new queue to store incoming requests std::queue<Request*> new_requests; - // lock down the request queue - m_requests.m_lock.lock(); - // lock and read as much as we can out of the queue - new_requests = m_requests.m_queue; - m_requests.m_queue = std::queue<Request*>(); // empty out the request queue + while( !m_requests.m_queue.empty() ){ + new_requests.push( m_requests.m_queue.front() ); + m_requests.m_queue.pop(); // empty out the request queue + } m_requests.m_lock.unlock(); // unlock the queue // process the new requests while( !new_requests.empty() ) { - Request *req = new_requests.front(); + HttpRequest *req = (HttpRequest*)new_requests.front(); new_requests.pop(); - //printf( "Request: %s, time since requested: %.2f\n", req->url.c_str(), difftime(time(NULL),req->start_time) ); +// printf( "Request: %s, time since requested: %.5lf\n", req->url.c_str(), Timer::elapsed_time( &(req->start_time) ) ); - // inc before we enable the request, since the time to enable and push to responses queue may - // be long enough for a context switch to allow someone to check request, response and pending count all be zero. - ++m_pending; - if( !req->enable() ) { // XXX: if using DNS in requests expect this method to block while dns resolves - --m_pending; - printf("EVD::Dispatch request error\n"); - // for some reason we couldn't enable the request cleanup the request and continue - delete req; + // NOTE: there is a potential for a race condition, where the request is cleaned up but not yet been popped + if( req && req->m_client ) { + // inc before we enable the request, since the time to enable and push to responses queue may + // be long enough for a context switch to allow someone to check request, response and pending count all be zero. + ++m_pending; + if( !req->enable() ) { // XXX: if using DNS in requests expect this method to block while dns resolves + --m_pending; + printf("EVD::Dispatch request error\n"); + // for some reason we couldn't enable the request cleanup the request and continue + delete req; + } } } } // the main event loop void Dispatch::event_loop_main() { -// struct ev_signal signal_exit_watcher; - m_loop = ev_loop_new(0); //ev_default_loop(0); + m_loop = ev_loop_new(0); -// m_clock.data = this; - - // every 100s of a second we'll check the queue for requests -// ev_timer_init( &m_clock, timeout_cb_start, 0.5, 0.5 ); -// ev_timer_again( m_loop, &m_clock ); // start timer - - // trap sigint to ensure we clean up the event loop before exit -// signal_exit_watcher.data = this; -// ev_signal_init( &signal_exit_watcher, sigint_cb, SIGINT ); -// ev_signal_start( m_loop, &signal_exit_watcher ); - m_request_watcher.data = this; ev_async_init( &m_request_watcher, request_cb_start ); ev_async_start( m_loop, &m_request_watcher ); - // start the main event loop - m_lock.lock(); m_loop_started = true; + m_loop_down = false; m_cond.signal(); // let the world know we're ready for them m_lock.unlock(); + // start the main event loop ev_loop( m_loop, 0 ); + ev_async_stop( m_loop, &m_request_watcher ); + m_http_client->stop(); ev_loop_destroy( m_loop ); - pthread_exit(NULL); + + m_lock.lock(); + printf( "let the world know the event loop is going down\n" ); + m_loop_down = true; + m_cond.signal(); // let the world know we're done + m_lock.unlock(); + + m_loop = NULL; + } void Dispatch::timeout_cb_start(struct ev_loop *loop, struct ev_timer *w, int revents) { Dispatch *ptr = ((Dispatch*)w->data); ptr->timeout_cb( loop, w, revents ); @@ -233,10 +294,11 @@ request_t Dispatch::request( Request *req ) { request_t key = m_counter++; req->set_key( key ); m_requests.push( req ); // load the request into the request queue + req = NULL; // at this point req is no longer valid on this thread ev_async_send( m_loop, &m_request_watcher ); // signal to the event loop we have new requests pending return key; // return to the client a unique identifier for matching up the response to this request } Response *Dispatch::response_for( request_t id ) @@ -249,24 +311,55 @@ } Queue<Response>::POP_STATE Dispatch::wait_for_response_by_id( request_t id, Timer timeout ) { // first check if the response is loaded - Response *rep = NULL; + Response *res = NULL; Queue<Response>::POP_STATE rstate; RespondedTable::iterator loc = m_responded.find(id); + int res_buff_size = m_responded.size(); if( loc != m_responded.end() ){ return Queue<Response>::POPPED; } + m_responses.m_lock.lock(); + + // see if we can get the response back + std::queue<Response*>::size_type size = m_responses.m_queue.size(); + if( size > 0 ) { + res = m_responses.m_queue.front(); + m_responses.m_queue.pop(); + m_responded[res->id] = res; + } + else { + // update the timeout to right now + timeout.update(); + m_responses.m_cond.timed_wait( m_responses.m_lock, timeout ); + if( size > 0 ) { + res = m_responses.m_queue.front(); + m_responses.m_queue.pop(); + m_responded[res->id] = res; + rstate = Queue<Response>::POPPED; + } + else { + rstate = Queue<Response>::EXPIRED; + res = NULL; + } + } + m_responses.m_lock.unlock(); + + return rstate; + /* timeout.update(); - while( (rep = m_responses.pop_or_wait( &rstate, m_loop_started, timeout )) ) { - //printf( "rep: %s, id: %d\n", rep->name.c_str(), rep->id ); - m_responded[rep->id] = rep; - if( rep->id == id ){ return rstate; } + while( (res = m_responses.pop_or_wait( &rstate, m_loop_started, timeout )) ) { + //printf( "res: %s, id: %d\n", res->name.c_str(), res->id ); + m_responded[res->id] = res; + if( res->id == id ){ return rstate; } timeout.update(); } + return rstate; +*/ } Response *Dispatch::get_next_response( Timer timer ) { Response *rep = NULL;