#include "config.h" #include #include "ev_dispatch.h" #include "ev_http.h" #define DEBUG #ifdef DEBUG #include #define ASSERT_IS_D_THRAD\ pthread_t tid = pthread_self();\ assert( tid == this->m_tid ); #else #define ASSERT_IS_D_THRAD #endif // --- // TODO: // libcurl: is used to handle the http requests // sphinxclient: library for the search engine requests // memcached: client library for the memcached requests // namespace EVD { double Timer::elapsed_time( struct timeval *y ) { struct timeval duration; struct timeval now; current_time( &now ); struct timeval *x = &now; struct timeval *result = &duration; // see: http://www.gnu.org/software/libtool/manual/libc/Elapsed-Time.html // Perform the carry for the later subtraction by updating y. if (x->tv_usec < y->tv_usec) { int nsec = (y->tv_usec - x->tv_usec) / 1000000 + 1; y->tv_usec -= 1000000 * nsec; y->tv_sec += nsec; } if (x->tv_usec - y->tv_usec > 1000000) { int nsec = (x->tv_usec - y->tv_usec) / 1000000; y->tv_usec += 1000000 * nsec; y->tv_sec -= nsec; } // Compute the time remaining to wait. // tv_usec is certainly positive. result->tv_sec = x->tv_sec - y->tv_sec; result->tv_usec = x->tv_usec - y->tv_usec; double elasped = ((double)duration.tv_sec + ((double)duration.tv_usec/1000000.0)); // Return 1 if result is negative. if( x->tv_sec < y->tv_sec ){ return (-1.0 * elasped); } return elasped; } Dispatch::Dispatch() : m_loop(NULL), m_counter(0), m_loop_started(false), m_http_client(NULL), m_pending(0) { // zero everything out memset(&m_clock,0,sizeof(ev_timer)); memset(&m_request_watcher,0,sizeof(ev_async)); memset(&m_tid,0,sizeof(pthread_t)); m_pid = getpid(); } Dispatch::~Dispatch() { if( m_http_client ) { delete m_http_client; } } // start up the background event listener bool Dispatch::start() { int rc; if( m_loop_started ){ return false; } rc = pthread_create( &m_tid, NULL, Dispatch::event_loop_start, this ); if( rc ){ return false; } pthread_detach( m_tid ); m_http_client = new HttpClient(this); // block until we get notified that the event thead is up and running Guard lock(m_lock); int count = 0; // wait until the event loop is started while( !m_loop_started && count < 100 ) { // never wait more then 100 iterations m_cond.timed_wait( m_lock, Timer(1,0) ); //printf( "waiting another second\n" ); ++count; } return (count < 100); } // tell the background event listener to terminate void Dispatch::stop() { if( !m_loop_started ){ return ; } //printf( "EVD::Dispatch stopping...\n" ); m_loop_started = false; m_cond.broadcast(); m_requests.signal(); m_responses.signal(); ev_unloop( m_loop, EVUNLOOP_ALL ); if( m_http_client ) { delete m_http_client; m_http_client = NULL; } } // all event loop activity happens on this thread void* Dispatch::event_loop_start( void *ptr ) { Dispatch *dis = (Dispatch*)ptr; dis->event_loop_main(); //printf( "EVD::Dispatch event loop stopped\n" ); return NULL; } static void sigint_cb( struct ev_loop *loop, struct ev_signal *w, int revents ) { Dispatch *ptr = ((Dispatch*)w->data); ptr->stop(); } // called when a new request was signaled from the main thread void Dispatch::request_cb_start( struct ev_loop *loop, struct ev_async *w, int revents ) { Dispatch *ptr = (Dispatch*)w->data; ptr->request_cb( loop, w, revents ); } void Dispatch::request_cb( struct ev_loop *loop, struct ev_async *w, int revents ) { ASSERT_IS_D_THRAD // create a new queue to store incoming requests std::queue new_requests; // lock down the request queue m_requests.m_lock.lock(); // lock and read as much as we can out of the queue new_requests = m_requests.m_queue; m_requests.m_queue = std::queue(); // empty out the request queue m_requests.m_lock.unlock(); // unlock the queue // process the new requests while( !new_requests.empty() ) { Request *req = new_requests.front(); new_requests.pop(); //printf( "Request: %s, time since requested: %.2f\n", req->url.c_str(), difftime(time(NULL),req->start_time) ); // inc before we enable the request, since the time to enable and push to responses queue may // be long enough for a context switch to allow someone to check request, response and pending count all be zero. ++m_pending; if( !req->enable() ) { // XXX: if using DNS in requests expect this method to block while dns resolves --m_pending; printf("EVD::Dispatch request error\n"); // for some reason we couldn't enable the request cleanup the request and continue delete req; } } } // the main event loop void Dispatch::event_loop_main() { // struct ev_signal signal_exit_watcher; m_loop = ev_loop_new(0); //ev_default_loop(0); // m_clock.data = this; // every 100s of a second we'll check the queue for requests // ev_timer_init( &m_clock, timeout_cb_start, 0.5, 0.5 ); // ev_timer_again( m_loop, &m_clock ); // start timer // trap sigint to ensure we clean up the event loop before exit // signal_exit_watcher.data = this; // ev_signal_init( &signal_exit_watcher, sigint_cb, SIGINT ); // ev_signal_start( m_loop, &signal_exit_watcher ); m_request_watcher.data = this; ev_async_init( &m_request_watcher, request_cb_start ); ev_async_start( m_loop, &m_request_watcher ); // start the main event loop m_lock.lock(); m_loop_started = true; m_cond.signal(); // let the world know we're ready for them m_lock.unlock(); ev_loop( m_loop, 0 ); ev_loop_destroy( m_loop ); pthread_exit(NULL); } void Dispatch::timeout_cb_start(struct ev_loop *loop, struct ev_timer *w, int revents) { Dispatch *ptr = ((Dispatch*)w->data); ptr->timeout_cb( loop, w, revents ); } // called on the event loop thread void Dispatch::timeout_cb(struct ev_loop *loop, struct ev_timer *w, int revents) { ASSERT_IS_D_THRAD //printf("timeout cb\n"); } // called on the main thread, not the event loop thread request_t Dispatch::request( Request *req ) { request_t key = m_counter++; req->set_key( key ); m_requests.push( req ); // load the request into the request queue ev_async_send( m_loop, &m_request_watcher ); // signal to the event loop we have new requests pending return key; // return to the client a unique identifier for matching up the response to this request } Response *Dispatch::response_for( request_t id ) { RespondedTable::iterator loc = m_responded.find(id); if( loc == m_responded.end() ){ return NULL; } Response *res = loc->second; m_responded.erase(loc); return res; } Queue::POP_STATE Dispatch::wait_for_response_by_id( request_t id, Timer timeout ) { // first check if the response is loaded Response *rep = NULL; Queue::POP_STATE rstate; RespondedTable::iterator loc = m_responded.find(id); if( loc != m_responded.end() ){ return Queue::POPPED; } while( (rep = m_responses.pop_or_wait( &rstate, m_loop_started, timeout )) ) { //printf( "rep: %s, id: %d\n", rep->name.c_str(), rep->id ); m_responded[rep->id] = rep; if( rep->id == id ){ return rstate; } } return rstate; } Response *Dispatch::get_next_response( Timer timer ) { Response *rep = NULL; Queue::POP_STATE rstate; do { rep = m_responses.pop_or_wait( &rstate, m_loop_started, timer ); if( rstate == Queue::EXITING ){ rep = NULL; break; } if( !rep && rstate == Queue::EXPIRED ) { int pending_count = m_pending; if( pending_count == 0 ) { // if pending is zero then acquire locks on the request and response queue int request_count = m_requests.size(); int response_count = m_responses.size(); //printf( "pending: %d, request: %d, response: %d\n", pending_count, request_count, response_count ); if( pending_count == 0 && request_count == 0 && response_count == 0 ) { return NULL; } } } else if( rep ) { --m_pending; } } while( !rep && rstate == Queue::EXPIRED ); return rep; } }// end namespace EVD