ext/revdispatch/libdispatch-0.1/src/ev_dispatch.cc in evdispatch-0.2.2 vs ext/revdispatch/libdispatch-0.1/src/ev_dispatch.cc in evdispatch-0.2.4
- old
+ new
@@ -7,11 +7,11 @@
#ifdef DEBUG
#include <assert.h>
#define ASSERT_IS_D_THRAD\
pthread_t tid = pthread_self();\
- assert( tid == this->m_tid );
+ assert( pthread_equal( tid, this->m_tid ) );
#else
#define ASSERT_IS_D_THRAD
#endif
// ---
@@ -21,10 +21,22 @@
// memcached: client library for the memcached requests
//
namespace EVD {
+int Timer::current_time( struct timeval *tv)
+{
+#if HAVE_CLOCK_GETTIME
+ struct timespec ts;
+ clock_gettime (CLOCK_REALTIME, &ts);
+ tv->tv_sec = ts.tv_sec;
+ tv->tv_usec = (long)(ts.tv_nsec * 0.0001);
+#else
+ gettimeofday (tv, 0);
+#endif
+}
+
double Timer::elapsed_time( struct timeval *y )
{
struct timeval duration;
struct timeval now;
current_time( &now );
@@ -55,10 +67,40 @@
if( x->tv_sec < y->tv_sec ){
return (-1.0 * elasped);
}
return elasped;
}
+template <typename T>
+T *Queue<T>::pop_or_wait( POP_STATE *rstate, volatile bool &cond, Timer timer )
+{
+ Guard lock(m_lock);
+ T *req = NULL;
+ size_t size = m_queue.size();
+ if( size > 0 ){
+ req = m_queue.front();
+ }
+ while( !req ) {
+ timer.update();
+ m_cond.timed_wait( m_lock, timer );
+ if( !cond ){ *rstate = EXITING; break; }
+ size = m_queue.size();
+ if( size > 0 ){
+ req = m_queue.front();
+ break;
+ }
+ else {
+ *rstate = EXPIRED;
+ req = NULL;
+ break;
+ }
+ }
+ if( req ){
+ *rstate = POPPED;
+ m_queue.pop();
+ }
+ return req;
+}
Dispatch::Dispatch() : m_loop(NULL), m_counter(0), m_loop_started(false), m_http_client(NULL), m_pending(0)
{
// zero everything out
memset(&m_clock,0,sizeof(ev_timer));
@@ -80,11 +122,11 @@
int rc;
if( m_loop_started ){ return false; }
rc = pthread_create( &m_tid, NULL, Dispatch::event_loop_start, this );
if( rc ){ return false; }
- pthread_detach( m_tid );
+ //pthread_detach( m_tid );
m_http_client = new HttpClient(this);
// block until we get notified that the event thead is up and running
Guard lock(m_lock);
@@ -115,17 +157,34 @@
// tell the background event listener to terminate
void Dispatch::stop()
{
if( !m_loop_started ){ return ; }
- //printf( "EVD::Dispatch stopping...\n" );
+ printf( "EVD::Dispatch stopping...\n" );
+ m_lock.lock();
m_loop_started = false;
-
m_cond.broadcast();
m_requests.signal();
m_responses.signal();
- ev_unloop( m_loop, EVUNLOOP_ALL );
+ m_lock.unlock();
+
+ // send the message to unloop all
+ {
+ Guard g(m_lock);
+ do {
+ // unloop again
+ printf( "main loop is waiting on the event loop to go down\n" );
+ ev_unloop( m_loop, EVUNLOOP_ALL );
+ if( m_loop_down ) { break; }
+ ev_unref( m_loop ); // decrement the ref count
+ // sleep until the event loop tells us it's done
+ m_cond.timed_wait(m_lock,Timer(1,0)); // wait until we're done
+ }while( !m_loop_down );
+ }
+
+ printf( "okay, we're joining the thread\n" );
+ pthread_join( m_tid, NULL );
if( m_http_client ) {
delete m_http_client;
m_http_client = NULL;
}
}
@@ -154,69 +213,71 @@
}
void Dispatch::request_cb( struct ev_loop *loop, struct ev_async *w, int revents )
{
ASSERT_IS_D_THRAD
+ // lock and read as much as we can out of the queue
+ m_requests.m_lock.lock();
// create a new queue to store incoming requests
std::queue<Request*> new_requests;
- // lock down the request queue
- m_requests.m_lock.lock();
- // lock and read as much as we can out of the queue
- new_requests = m_requests.m_queue;
- m_requests.m_queue = std::queue<Request*>(); // empty out the request queue
+ while( !m_requests.m_queue.empty() ){
+ new_requests.push( m_requests.m_queue.front() );
+ m_requests.m_queue.pop(); // empty out the request queue
+ }
m_requests.m_lock.unlock(); // unlock the queue
// process the new requests
while( !new_requests.empty() ) {
- Request *req = new_requests.front();
+ HttpRequest *req = (HttpRequest*)new_requests.front();
new_requests.pop();
- //printf( "Request: %s, time since requested: %.2f\n", req->url.c_str(), difftime(time(NULL),req->start_time) );
+// printf( "Request: %s, time since requested: %.5lf\n", req->url.c_str(), Timer::elapsed_time( &(req->start_time) ) );
- // inc before we enable the request, since the time to enable and push to responses queue may
- // be long enough for a context switch to allow someone to check request, response and pending count all be zero.
- ++m_pending;
- if( !req->enable() ) { // XXX: if using DNS in requests expect this method to block while dns resolves
- --m_pending;
- printf("EVD::Dispatch request error\n");
- // for some reason we couldn't enable the request cleanup the request and continue
- delete req;
+ // NOTE: there is a potential for a race condition, where the request is cleaned up but not yet been popped
+ if( req && req->m_client ) {
+ // inc before we enable the request, since the time to enable and push to responses queue may
+ // be long enough for a context switch to allow someone to check request, response and pending count all be zero.
+ ++m_pending;
+ if( !req->enable() ) { // XXX: if using DNS in requests expect this method to block while dns resolves
+ --m_pending;
+ printf("EVD::Dispatch request error\n");
+ // for some reason we couldn't enable the request cleanup the request and continue
+ delete req;
+ }
}
}
}
// the main event loop
void Dispatch::event_loop_main()
{
-// struct ev_signal signal_exit_watcher;
- m_loop = ev_loop_new(0); //ev_default_loop(0);
+ m_loop = ev_loop_new(0);
-// m_clock.data = this;
-
- // every 100s of a second we'll check the queue for requests
-// ev_timer_init( &m_clock, timeout_cb_start, 0.5, 0.5 );
-// ev_timer_again( m_loop, &m_clock ); // start timer
-
- // trap sigint to ensure we clean up the event loop before exit
-// signal_exit_watcher.data = this;
-// ev_signal_init( &signal_exit_watcher, sigint_cb, SIGINT );
-// ev_signal_start( m_loop, &signal_exit_watcher );
-
m_request_watcher.data = this;
ev_async_init( &m_request_watcher, request_cb_start );
ev_async_start( m_loop, &m_request_watcher );
- // start the main event loop
-
m_lock.lock();
m_loop_started = true;
+ m_loop_down = false;
m_cond.signal(); // let the world know we're ready for them
m_lock.unlock();
+ // start the main event loop
ev_loop( m_loop, 0 );
+ ev_async_stop( m_loop, &m_request_watcher );
+ m_http_client->stop();
ev_loop_destroy( m_loop );
- pthread_exit(NULL);
+
+ m_lock.lock();
+ printf( "let the world know the event loop is going down\n" );
+ m_loop_down = true;
+ m_cond.signal(); // let the world know we're done
+ m_lock.unlock();
+
+ m_loop = NULL;
+
}
void Dispatch::timeout_cb_start(struct ev_loop *loop, struct ev_timer *w, int revents)
{
Dispatch *ptr = ((Dispatch*)w->data);
ptr->timeout_cb( loop, w, revents );
@@ -233,10 +294,11 @@
request_t Dispatch::request( Request *req )
{
request_t key = m_counter++;
req->set_key( key );
m_requests.push( req ); // load the request into the request queue
+ req = NULL; // at this point req is no longer valid on this thread
ev_async_send( m_loop, &m_request_watcher ); // signal to the event loop we have new requests pending
return key; // return to the client a unique identifier for matching up the response to this request
}
Response *Dispatch::response_for( request_t id )
@@ -249,24 +311,55 @@
}
Queue<Response>::POP_STATE Dispatch::wait_for_response_by_id( request_t id, Timer timeout )
{
// first check if the response is loaded
- Response *rep = NULL;
+ Response *res = NULL;
Queue<Response>::POP_STATE rstate;
RespondedTable::iterator loc = m_responded.find(id);
+ int res_buff_size = m_responded.size();
if( loc != m_responded.end() ){ return Queue<Response>::POPPED; }
+ m_responses.m_lock.lock();
+
+ // see if we can get the response back
+ std::queue<Response*>::size_type size = m_responses.m_queue.size();
+ if( size > 0 ) {
+ res = m_responses.m_queue.front();
+ m_responses.m_queue.pop();
+ m_responded[res->id] = res;
+ }
+ else {
+ // update the timeout to right now
+ timeout.update();
+ m_responses.m_cond.timed_wait( m_responses.m_lock, timeout );
+ if( size > 0 ) {
+ res = m_responses.m_queue.front();
+ m_responses.m_queue.pop();
+ m_responded[res->id] = res;
+ rstate = Queue<Response>::POPPED;
+ }
+ else {
+ rstate = Queue<Response>::EXPIRED;
+ res = NULL;
+ }
+ }
+ m_responses.m_lock.unlock();
+
+ return rstate;
+ /*
timeout.update();
- while( (rep = m_responses.pop_or_wait( &rstate, m_loop_started, timeout )) ) {
- //printf( "rep: %s, id: %d\n", rep->name.c_str(), rep->id );
- m_responded[rep->id] = rep;
- if( rep->id == id ){ return rstate; }
+ while( (res = m_responses.pop_or_wait( &rstate, m_loop_started, timeout )) ) {
+ //printf( "res: %s, id: %d\n", res->name.c_str(), res->id );
+ m_responded[res->id] = res;
+ if( res->id == id ){ return rstate; }
timeout.update();
}
+
return rstate;
+*/
}
Response *Dispatch::get_next_response( Timer timer )
{
Response *rep = NULL;