#include "config.h" #include #include "ev_dispatch.h" #include "ev_http.h" #define DEBUG #ifdef DEBUG #include #define ASSERT_IS_D_THRAD\ pthread_t tid = pthread_self();\ assert( pthread_equal( tid, this->m_tid ) ); #else #define ASSERT_IS_D_THRAD #endif // --- // TODO: // libcurl: is used to handle the http requests // sphinxclient: library for the search engine requests // memcached: client library for the memcached requests // namespace EVD { int Timer::current_time( struct timeval *tv) { #if HAVE_CLOCK_GETTIME struct timespec ts; clock_gettime (CLOCK_REALTIME, &ts); tv->tv_sec = ts.tv_sec; tv->tv_usec = (long)(ts.tv_nsec * 0.0001); #else gettimeofday (tv, 0); #endif } double Timer::elapsed_time( struct timeval *y ) { struct timeval duration; struct timeval now; current_time( &now ); struct timeval *x = &now; struct timeval *result = &duration; // see: http://www.gnu.org/software/libtool/manual/libc/Elapsed-Time.html // Perform the carry for the later subtraction by updating y. if (x->tv_usec < y->tv_usec) { int nsec = (y->tv_usec - x->tv_usec) / 1000000 + 1; y->tv_usec -= 1000000 * nsec; y->tv_sec += nsec; } if (x->tv_usec - y->tv_usec > 1000000) { int nsec = (x->tv_usec - y->tv_usec) / 1000000; y->tv_usec += 1000000 * nsec; y->tv_sec -= nsec; } // Compute the time remaining to wait. // tv_usec is certainly positive. result->tv_sec = x->tv_sec - y->tv_sec; result->tv_usec = x->tv_usec - y->tv_usec; double elasped = ((double)duration.tv_sec + ((double)duration.tv_usec/1000000.0)); // Return 1 if result is negative. if( x->tv_sec < y->tv_sec ){ return (-1.0 * elasped); } return elasped; } template T *Queue::pop_or_wait( POP_STATE *rstate, volatile bool &cond, Timer timer ) { Guard lock(m_lock); T *req = NULL; size_t size = m_queue.size(); if( size > 0 ){ req = m_queue.front(); } while( !req ) { timer.update(); m_cond.timed_wait( m_lock, timer ); if( !cond ){ *rstate = EXITING; break; } size = m_queue.size(); if( size > 0 ){ req = m_queue.front(); break; } else { *rstate = EXPIRED; req = NULL; break; } } if( req ){ *rstate = POPPED; m_queue.pop(); } return req; } Dispatch::Dispatch() : m_loop(NULL), m_counter(0), m_loop_started(false), m_http_client(NULL), m_pending(0) { // zero everything out memset(&m_clock,0,sizeof(ev_timer)); memset(&m_request_watcher,0,sizeof(ev_async)); memset(&m_tid,0,sizeof(pthread_t)); m_pid = getpid(); } Dispatch::~Dispatch() { if( m_http_client ) { delete m_http_client; } } // start up the background event listener bool Dispatch::start() { int rc; if( m_loop_started ){ return false; } rc = pthread_create( &m_tid, NULL, Dispatch::event_loop_start, this ); if( rc ){ return false; } //pthread_detach( m_tid ); m_http_client = new HttpClient(this); // block until we get notified that the event thead is up and running Guard lock(m_lock); int count = 0; // wait until the event loop is started while( !m_loop_started && count < 100 ) { // never wait more then 100 iterations m_cond.timed_wait( m_lock, Timer(1,0) ); //printf( "waiting another second\n" ); ++count; } return (count < 100); } void Dispatch::flush() { // lock the responses m_responses.m_lock.lock(); while( !m_responses.m_queue.empty() ){ m_responses.m_queue.pop(); } // empty out the thread local buffer m_responded.clear(); m_responses.m_lock.unlock(); } // tell the background event listener to terminate void Dispatch::stop() { if( !m_loop_started ){ return ; } //printf( "EVD::Dispatch stopping...\n" ); m_lock.lock(); m_loop_started = false; m_lock.unlock(); m_cond.broadcast(); m_requests.signal(); m_responses.signal(); // send the message to unloop all /*{ Guard g(m_lock); do { // unloop again printf( "main loop is waiting on the event loop to go down: %d\n", ev_loop_count( m_loop ) ); // sleep until the event loop tells us it's done m_cond.timed_wait(m_lock,Timer(2,0)); // wait until we're done }while( !m_loop_down ); }*/ ev_async_send( m_loop, &m_loop_ender ); // signal to the event loop it's time to stop //printf( "okay, we're joining the thread\n" ); pthread_join( m_tid, NULL ); if( m_http_client ) { delete m_http_client; m_http_client = NULL; } } // all event loop activity happens on this thread void* Dispatch::event_loop_start( void *ptr ) { Dispatch *dis = (Dispatch*)ptr; dis->event_loop_main(); //printf( "EVD::Dispatch event loop stopped\n" ); return NULL; } static void sigint_cb( struct ev_loop *loop, struct ev_signal *w, int revents ) { Dispatch *ptr = ((Dispatch*)w->data); ptr->stop(); } // called when a new request was signaled from the main thread void Dispatch::request_cb_start( struct ev_loop *loop, struct ev_async *w, int revents ) { Dispatch *ptr = (Dispatch*)w->data; ptr->request_cb( loop, w, revents ); } void Dispatch::request_cb( struct ev_loop *loop, struct ev_async *w, int revents ) { ASSERT_IS_D_THRAD // lock and read as much as we can out of the queue m_requests.m_lock.lock(); // create a new queue to store incoming requests std::queue new_requests; while( !m_requests.m_queue.empty() ){ new_requests.push( m_requests.m_queue.front() ); m_requests.m_queue.pop(); // empty out the request queue } m_requests.m_lock.unlock(); // unlock the queue // process the new requests while( !new_requests.empty() ) { HttpRequest *req = (HttpRequest*)new_requests.front(); new_requests.pop(); // printf( "Request: %s, time since requested: %.5lf\n", req->url.c_str(), Timer::elapsed_time( &(req->start_time) ) ); // NOTE: there is a potential for a race condition, where the request is cleaned up but not yet been popped if( req && req->m_client ) { // inc before we enable the request, since the time to enable and push to responses queue may // be long enough for a context switch to allow someone to check request, response and pending count all be zero. ++m_pending; if( !req->enable() ) { // XXX: if using DNS in requests expect this method to block while dns resolves --m_pending; printf("EVD::Dispatch request error\n"); // for some reason we couldn't enable the request cleanup the request and continue delete req; } } } } void Dispatch::shutdown_loop_cb( struct ev_loop *loop, struct ev_async *w, int revents ) { // printf( "received the loop unref message\n" ); Dispatch *d = (Dispatch*)w->data; ev_async_stop( d->m_loop, &(d->m_request_watcher) ); ev_async_stop( d->m_loop, &(d->m_loop_ender) ); ev_unloop( d->m_loop, EVUNLOOP_ALL ); } // the main event loop void Dispatch::event_loop_main() { m_loop = ev_loop_new(0); m_request_watcher.data = this; ev_async_init( &m_request_watcher, request_cb_start ); ev_async_start( m_loop, &m_request_watcher ); m_loop_ender.data = this; ev_async_init( &m_loop_ender, shutdown_loop_cb ); ev_async_start( m_loop, &m_loop_ender ); ev_unref( m_loop ); // don't include the ender in the ev_loop life count m_lock.lock(); m_loop_started = true; m_loop_down = false; m_cond.signal(); // let the world know we're ready for them m_lock.unlock(); // printf( "loop running\n" ); // start the main event loop ev_loop( m_loop, 0 ); // printf( "ev loop is done\n" ); // the shutdown sequence m_http_client->stop(); ev_loop_destroy( m_loop ); // printf( "get the lock to shutdown\n" ); m_lock.lock(); // printf( "let the world know the event loop is going down\n" ); m_loop_down = true; m_cond.signal(); // let the world know we're done m_lock.unlock(); m_loop = NULL; } void Dispatch::timeout_cb_start(struct ev_loop *loop, struct ev_timer *w, int revents) { Dispatch *ptr = ((Dispatch*)w->data); ptr->timeout_cb( loop, w, revents ); } // called on the event loop thread void Dispatch::timeout_cb(struct ev_loop *loop, struct ev_timer *w, int revents) { ASSERT_IS_D_THRAD //printf("timeout cb\n"); } // called on the main thread, not the event loop thread request_t Dispatch::request( Request *req ) { request_t key = m_counter++; req->set_key( key ); m_requests.push( req ); // load the request into the request queue req = NULL; // at this point req is no longer valid on this thread ev_async_send( m_loop, &m_request_watcher ); // signal to the event loop we have new requests pending return key; // return to the client a unique identifier for matching up the response to this request } Response *Dispatch::response_for( request_t id ) { RespondedTable::iterator loc = m_responded.find(id); if( loc == m_responded.end() ){ return NULL; } Response *res = loc->second; m_responded.erase(loc); return res; } Queue::POP_STATE Dispatch::wait_for_response_by_id( request_t id, Timer timeout ) { // first check if the response is loaded Response *res = NULL; Queue::POP_STATE rstate; RespondedTable::iterator loc = m_responded.find(id); int res_buff_size = m_responded.size(); if( loc != m_responded.end() ){ return Queue::POPPED; } m_responses.m_lock.lock(); // see if we can get the response back std::queue::size_type size = m_responses.m_queue.size(); if( size > 0 ) { res = m_responses.m_queue.front(); m_responses.m_queue.pop(); m_responded[res->id] = res; } else { // update the timeout to right now timeout.update(); m_responses.m_cond.timed_wait( m_responses.m_lock, timeout ); if( size > 0 ) { res = m_responses.m_queue.front(); m_responses.m_queue.pop(); m_responded[res->id] = res; rstate = Queue::POPPED; } else { rstate = Queue::EXPIRED; res = NULL; } } m_responses.m_lock.unlock(); return rstate; /* timeout.update(); while( (res = m_responses.pop_or_wait( &rstate, m_loop_started, timeout )) ) { //printf( "res: %s, id: %d\n", res->name.c_str(), res->id ); m_responded[res->id] = res; if( res->id == id ){ return rstate; } timeout.update(); } return rstate; */ } Response *Dispatch::get_next_response( Timer timer ) { Response *rep = NULL; Queue::POP_STATE rstate; timer.update(); do { rep = m_responses.pop_or_wait( &rstate, m_loop_started, timer ); if( rstate == Queue::EXITING ){ rep = NULL; break; } if( !rep && rstate == Queue::EXPIRED ) { int pending_count = m_pending; if( pending_count == 0 ) { // if pending is zero then acquire locks on the request and response queue int request_count = m_requests.size(); int response_count = m_responses.size(); //printf( "pending: %d, request: %d, response: %d\n", pending_count, request_count, response_count ); if( pending_count == 0 && request_count == 0 && response_count == 0 ) { return NULL; } } } else if( rep ) { --m_pending; } timer.update(); } while( !rep && rstate == Queue::EXPIRED ); return rep; } }// end namespace EVD