#ifndef EV_DISPATCH_H #define EV_DISPATCH_H #include "config.h" #include #include #include #include #include #include #include #include namespace EVD { // unique id to represent a request typedef unsigned long request_t; struct Request { enum Type{ HTTP, SPHINX, MEMCACHED }; Request( request_t k, const std::string &u ) : key(k), url(u){ time( &start_time); } virtual ~Request (){ } // by default this does nothing, each real request object can do as it pleases with this feature virtual void set_opt( const std::string &key, const std::string &value ){} virtual void set_key( request_t key ){ this->key = key; } // attach to the event loop virtual bool enable() = 0; // key will be set by the dispatch object when you invoke this request request_t key; std::string url; time_t start_time; }; // TODO: //struct MemcachedRequesst : public Request { //}; // TODO: //struct SphinxRequest : public Request { //}; struct Response { virtual ~Response(){} std::string name; std::string body; request_t id; double response_time; // computed as difftime(start_time,end_time); }; struct Mutex { Mutex() { pthread_mutex_init( &m_lock, NULL ); } ~Mutex() { pthread_mutex_destroy( &m_lock ); } void lock() { pthread_mutex_lock( &m_lock ); } void unlock() { pthread_mutex_unlock( &m_lock ); } pthread_mutex_t m_lock; }; struct Timer { Timer( long int seconds, long int nanoseconds ){ struct timeval now; gettimeofday(&now, NULL); m_time.tv_sec = now.tv_sec + seconds; m_time.tv_nsec = (now.tv_usec * 1000) + nanoseconds; } struct timespec m_time; }; struct Cond { Cond() { pthread_cond_init( &m_cond, NULL ); } ~Cond() { pthread_cond_destroy( &m_cond ); } void wait( Mutex &m ) { pthread_cond_wait( &m_cond, &(m.m_lock) ); } // wait 500 ms // timed_wait( m, Timer(0,500) ); void timed_wait( Mutex &m, const Timer &t ) { pthread_cond_timedwait( &m_cond, &(m.m_lock), &(t.m_time) ); } void signal() { pthread_cond_signal( &m_cond ); } void broadcast() { pthread_cond_broadcast( &m_cond ); } pthread_cond_t m_cond; }; struct Guard { Guard( Mutex &mutex ): m_mutex(mutex){ m_mutex.lock(); } ~Guard(){ m_mutex.unlock(); } Mutex &m_mutex; }; // thread safe queue template struct Queue { Queue() {} ~Queue() {} // the reason the pop returned, time expired, event loop is exiting, or an object was popped enum POP_STATE { POPPED = 0, EXPIRED = 1, EXITING = 2 }; // heap allocated T object void push( T *req ) { m_lock.lock(); m_queue.push( req ); m_lock.unlock(); m_cond.signal(); } // returns a value from the queue // rstate: the status of the return, see POP_STATE above. // cond: an external reason to abort and not pop e.g. exiting event loop T *pop_or_wait( POP_STATE *rstate, volatile bool &cond = true, Timer timer = Timer(1,0) ) { Guard lock(m_lock); T *req = NULL; size_t size = m_queue.size(); if( size > 0 ){ req = m_queue.front(); } while( !req ) { m_cond.timed_wait( m_lock, timer ); if( !cond ){ *rstate = EXITING; break; } size = m_queue.size(); if( size > 0 ){ req = m_queue.front(); } else { *rstate = EXPIRED; req = NULL; break; } } if( req ){ *rstate = POPPED; m_queue.pop(); } return req; } size_t size(){ Guard lock(m_lock); return m_queue.size(); } T *pop() { Guard lock(m_lock); T *req = NULL; req = m_queue.front(); if( req ){ m_queue.pop(); } return req; } void signal(){ m_cond.signal(); } std::queue m_queue; Mutex m_lock; Cond m_cond; }; // used to keep track of pending requests vs completed responses struct AtomicCounter { // initialization is not threadsafe AtomicCounter( int init ) : m_count(init){} inline operator int(){ Guard g(m_lock); return m_count; } inline int operator++(){ Guard g(m_lock); return ++m_count; } inline int operator--(){ Guard g(m_lock); return --m_count; } Mutex m_lock; int m_count; }; // // Dispatch requests to a thread running an event loop // all methods here should be used within the same thread // // create a Dispatcher // call start // // send the dispatcher some requests // // do some work // // request the dispatcher results // // TODO: optionally you can tell the dispatcher to store the results on the file system via a pipe // struct Dispatch { typedef std::map RespondedTable; Dispatch(); ~Dispatch(); // start up the background event listener bool start(); // tell the background event listener to terminate void stop(); request_t request( Request *req ); // from the main thread, get the next available response // just keep pop'ing off the response queue until we get something // if you get a non NULL response from this method it's your responsiblity to delete it // // Use this method if you want to get all pending requests Response *get_next_response( Timer timer = Timer(1,0) ); // get a specific respone by id // if you get a non NULL response from this method it's your responsiblity to delete it // // Use this method if you want to get a specific request Response *response_for( request_t id ); // call this method to block and wait for a specific request // then call response_for to retrieve the response, in the background this will collect // other pending requests into an internal lookup table Queue::POP_STATE wait_for_response_by_id( request_t id, Timer timeout ); // from the backend called on the event loop thread inline void send_response( Response *response ) { m_responses.push( response ); } inline struct ev_loop *get_loop(){ return m_loop; } inline pthread_t get_thread_id()const{ return m_tid; } inline struct HttpClient *getHttpClient()const{ return m_http_client; } protected: // all event loop activity happens on this thread static void* event_loop_start( void *ptr ); void event_loop_main(); static void timeout_cb_start(struct ev_loop *loop, struct ev_timer *w, int revents); void timeout_cb(struct ev_loop *loop, struct ev_timer *w, int revents); static void request_cb_start( struct ev_loop *loop, struct ev_async *w, int revents ); void request_cb( struct ev_loop *loop, struct ev_async *w, int revents ); bool store_response_for( request_t id ); protected: struct ev_loop *m_loop; // this triggers a callback once every N milliseconds struct ev_timer m_clock; struct ev_async m_request_watcher; pid_t m_pid; pthread_t m_tid; request_t m_counter; // used to create ids for each requests // sync startup to main thread Mutex m_lock; Cond m_cond; bool m_loop_started; Queue m_requests; Queue m_responses; // stores all responded messages stored when calling wait_for_request_by_id( request_t id ) RespondedTable m_responded; struct HttpClient *m_http_client; AtomicCounter m_pending; }; } #endif