#include "config.h" #include #include "ev_dispatch.h" #include "ev_http.h" #define DEBUG #ifdef DEBUG #include #define ASSERT_IS_D_THRAD\ pthread_t tid = pthread_self();\ assert( tid == this->m_tid ); #else #define ASSERT_IS_D_THRAD #endif // --- // TODO: // libcurl: is used to handle the http requests // sphinxclient: library for the search engine requests // memcached: client library for the memcached requests // namespace EVD { Dispatch::Dispatch() : m_loop(NULL), m_counter(0), m_loop_started(false), m_http_client(NULL), m_pending(0) { // zero everything out memset(&m_clock,0,sizeof(ev_timer)); memset(&m_request_watcher,0,sizeof(ev_async)); memset(&m_tid,0,sizeof(pthread_t)); m_pid = getpid(); } Dispatch::~Dispatch() { if( m_http_client ) { delete m_http_client; } } // start up the background event listener bool Dispatch::start() { int rc; if( m_loop_started ){ return false; } rc = pthread_create( &m_tid, NULL, Dispatch::event_loop_start, this ); if( rc ){ return false; } pthread_detach( m_tid ); m_http_client = new HttpClient(this); // block until we get notified that the event thead is up and running Guard lock(m_lock); int count = 0; // wait until the event loop is started while( !m_loop_started && count < 100 ) { // never wait more then 100 iterations m_cond.timed_wait( m_lock, Timer(1,0) ); //printf( "waiting another second\n" ); ++count; } return (count < 100); } // tell the background event listener to terminate void Dispatch::stop() { if( !m_loop_started ){ return ; } //printf( "EVD::Dispatch stopping...\n" ); m_loop_started = false; m_cond.broadcast(); m_requests.signal(); m_responses.signal(); ev_unloop( m_loop, EVUNLOOP_ALL ); if( m_http_client ) { delete m_http_client; m_http_client = NULL; } } // all event loop activity happens on this thread void* Dispatch::event_loop_start( void *ptr ) { Dispatch *dis = (Dispatch*)ptr; dis->event_loop_main(); //printf( "EVD::Dispatch event loop stopped\n" ); return NULL; } static void sigint_cb( struct ev_loop *loop, struct ev_signal *w, int revents ) { Dispatch *ptr = ((Dispatch*)w->data); ptr->stop(); } // called when a new request was signaled from the main thread void Dispatch::request_cb_start( struct ev_loop *loop, struct ev_async *w, int revents ) { Dispatch *ptr = (Dispatch*)w->data; ptr->request_cb( loop, w, revents ); } void Dispatch::request_cb( struct ev_loop *loop, struct ev_async *w, int revents ) { ASSERT_IS_D_THRAD // create a new queue to store incoming requests std::queue new_requests; // lock down the request queue m_requests.m_lock.lock(); // lock and read as much as we can out of the queue new_requests = m_requests.m_queue; m_requests.m_queue = std::queue(); // empty out the request queue m_requests.m_lock.unlock(); // unlock the queue // process the new requests while( !new_requests.empty() ) { Request *req = new_requests.front(); new_requests.pop(); //printf( "Request: %s, time since requested: %.2f\n", req->url.c_str(), difftime(time(NULL),req->start_time) ); // inc before we enable the request, since the time to enable and push to responses queue may // be long enough for a context switch to allow someone to check request, response and pending count all be zero. ++m_pending; if( !req->enable() ) { // XXX: if using DNS in requests expect this method to block while dns resolves --m_pending; printf("EVD::Dispatch request error\n"); // for some reason we couldn't enable the request cleanup the request and continue delete req; } } } // the main event loop void Dispatch::event_loop_main() { // struct ev_signal signal_exit_watcher; m_loop = ev_loop_new(0); //ev_default_loop(0); // m_clock.data = this; // every 100s of a second we'll check the queue for requests // ev_timer_init( &m_clock, timeout_cb_start, 0.5, 0.5 ); // ev_timer_again( m_loop, &m_clock ); // start timer // trap sigint to ensure we clean up the event loop before exit // signal_exit_watcher.data = this; // ev_signal_init( &signal_exit_watcher, sigint_cb, SIGINT ); // ev_signal_start( m_loop, &signal_exit_watcher ); m_request_watcher.data = this; ev_async_init( &m_request_watcher, request_cb_start ); ev_async_start( m_loop, &m_request_watcher ); // start the main event loop m_lock.lock(); m_loop_started = true; m_cond.signal(); // let the world know we're ready for them m_lock.unlock(); ev_loop( m_loop, 0 ); ev_loop_destroy( m_loop ); pthread_exit(NULL); } void Dispatch::timeout_cb_start(struct ev_loop *loop, struct ev_timer *w, int revents) { Dispatch *ptr = ((Dispatch*)w->data); ptr->timeout_cb( loop, w, revents ); } // called on the event loop thread void Dispatch::timeout_cb(struct ev_loop *loop, struct ev_timer *w, int revents) { ASSERT_IS_D_THRAD //printf("timeout cb\n"); } // called on the main thread, not the event loop thread request_t Dispatch::request( Request::Type type, const std::string &url ) { request_t key = m_counter++; Request *req = NULL; // check the request type and switch( type ) { case Request::HTTP: req = new HttpRequest( m_http_client, key, url ); break; case Request::SPHINX: //req = new SphinxRequest( key, url ); break; case Request::MEMCACHED: break; default: // TODO: log an error break; } m_requests.push( req ); ev_async_send( m_loop, &m_request_watcher ); return key; } Response *Dispatch::response_for( request_t id ) { RespondedTable::iterator loc = m_responded.find(id); if( loc == m_responded.end() ){ return NULL; } Response *res = loc->second; m_responded.erase(loc); return res; } Queue::POP_STATE Dispatch::wait_for_response_by_id( request_t id, Timer timeout ) { // first check if the response is loaded Response *rep = NULL; Queue::POP_STATE rstate; RespondedTable::iterator loc = m_responded.find(id); if( loc != m_responded.end() ){ return Queue::POPPED; } while( (rep = m_responses.pop_or_wait( &rstate, m_loop_started, timeout )) ) { //printf( "rep: %s, id: %d\n", rep->name.c_str(), rep->id ); m_responded[rep->id] = rep; if( rep->id == id ){ return rstate; } } return rstate; } Response *Dispatch::get_next_response( Timer timer ) { Response *rep = NULL; Queue::POP_STATE rstate; do { rep = m_responses.pop_or_wait( &rstate, m_loop_started, timer ); if( rstate == Queue::EXITING ){ rep = NULL; break; } if( !rep && rstate == Queue::EXPIRED ) { int pending_count = m_pending; if( pending_count == 0 ) { // if pending is zero then acquire locks on the request and response queue int request_count = m_requests.size(); int response_count = m_responses.size(); //printf( "pending: %d, request: %d, response: %d\n", pending_count, request_count, response_count ); if( pending_count == 0 && request_count == 0 && response_count == 0 ) { return NULL; } } } else if( rep ) { --m_pending; } } while( !rep && rstate == Queue::EXPIRED ); return rep; } }// end namespace EVD