ext/revdispatch/libdispatch-0.1/src/ev_http.cc in evdispatch-0.2.2 vs ext/revdispatch/libdispatch-0.1/src/ev_http.cc in evdispatch-0.2.4

- old
+ new

@@ -8,16 +8,46 @@ #ifdef DEBUG #include <assert.h> #define ASSERT_IS_D_THRAD(id)\ {\ pthread_t tid = pthread_self();\ - assert( tid == id );\ + assert( pthread_equal(tid, id) );\ } #else #define ASSERT_IS_D_THRAD #endif +static void multi_error_report(CURLMcode rc) +{ + switch(rc) { + case CURLM_CALL_MULTI_PERFORM: + printf("curlm:perform\n"); + break; + case CURLM_OK: +// printf("curlm:ok\n"); + break; + case CURLM_BAD_HANDLE: + printf("curlm:bad handle\n"); + break; + case CURLM_BAD_EASY_HANDLE: + printf("curlm:bad easy handle\n"); + break; + case CURLM_OUT_OF_MEMORY: + printf("curlm:oh shit out of memory\n"); + break; + case CURLM_INTERNAL_ERROR: + printf("curlm:oh shit internal error\n"); + break; + case CURLM_BAD_SOCKET: + printf("curlm:bad socket\n"); + break; + case CURLM_UNKNOWN_OPTION: + printf("curlm:unknown option\n"); + break; + } +} + void HttpClient::SocketInfo::response_cb(struct ev_loop *loop, struct ev_io *w, int revents) { CURLMcode rc; HttpClient::SocketInfo *res = ((HttpClient::SocketInfo*)w->data); HttpClient *client = res->m_client; @@ -40,15 +70,18 @@ ev_io_init( m_watcher, response_cb, sock, kind ); ev_io_start( m_client->m_disp->get_loop(), m_watcher ); } void HttpClient::SocketInfo::set_sock( curl_socket_t sock, CURL *e, int action ) { - int kind = (action&CURL_POLL_IN?EV_READ:0)|(action&CURL_POLL_OUT?EV_WRITE:0); + int kind = (action&CURL_POLL_IN?EV_READ:0)|(action&CURL_POLL_OUT?EV_WRITE:0)|(action&CURL_POLL_INOUT?EV_READ|EV_WRITE:0); + ev_io_stop( m_client->m_disp->get_loop(), m_watcher ); ev_io_set( m_watcher, sock, kind ); + ev_io_start( m_client->m_disp->get_loop(), m_watcher ); } void HttpClient::SocketInfo::finish() { +// printf(" sock finish: 0x%X\n", this ); ev_io_stop( m_client->m_disp->get_loop(), m_watcher ); } HttpClient::SocketInfo::~SocketInfo() { free(m_watcher); @@ -58,26 +91,35 @@ int HttpClient::sock_cb(CURL *e, curl_socket_t sock, int action, void *cbp, void *sockp) { HttpClient *client = (HttpClient*)cbp; HttpClient::SocketInfo *sockinfo = (HttpClient::SocketInfo *)sockp; ASSERT_IS_D_THRAD(client->m_disp->get_thread_id()); + if( !sockinfo ) { sockinfo = new HttpClient::SocketInfo( client, sock, e, action ); curl_multi_assign(client->m_handle, sock, sockinfo); +// printf( " new sock: 0x%X\n", sockinfo ); } switch( action ){ case CURL_POLL_REMOVE: // (4) unregister // make sure we clear it out +// printf( " del sock: 0x%X\n", sockinfo ); curl_multi_assign(client->m_handle, sock, NULL); sockinfo->finish(); delete sockinfo; + client->check_handles(); break; case CURL_POLL_NONE: // (0) register, not interested in readiness (yet) +// printf("none\n"); + break; case CURL_POLL_IN: // (1) register, interested in read readiness +// printf("in\n"); case CURL_POLL_OUT: // (2) register, interested in write readiness +// printf("out\n"); case CURL_POLL_INOUT: // (3) register, interested in both read and write readiness +// printf("inout\n"); default: sockinfo->set_sock( sock, e, action ); client->check_handles(); break; } @@ -87,10 +129,11 @@ // called on the event loop thread void HttpClient::timeout_cb(struct ev_loop *loop, struct ev_timer *w, int revents) { CURLMcode rc; HttpClient *client = (HttpClient*)w->data; +// printf( "timeout\n" ); do { rc = curl_multi_socket(client->m_handle, CURL_SOCKET_TIMEOUT, &(client->m_active)); } while( rc == CURLM_CALL_MULTI_PERFORM ); client->check_handles(); } @@ -101,17 +144,26 @@ //timeout.tv_sec = timeout_ms/1000; //timeout.tv_usec = (timeout_ms%1000)*1000; HttpClient *client = (HttpClient*)userp; // update the timer - ev_tstamp timeout = ev_now(client->m_disp->get_loop()) + (timeout_ms * 0.001); + ev_tstamp timeout = /*ev_now(client->m_disp->get_loop()) + */(timeout_ms * 0.001); + if( timeout_ms == 0 ) { + timeout = 0.1; // enforce always timeout + } + if( timeout < 0.05 ) { + timeout = 0.05; // never let it go below this + } +// printf( "timeout set %lf\n", timeout ); if( client->m_timer_set ) { - ev_timer_set( &(client->m_timer), timeout, 0.0 ); - ev_timer_again( client->m_disp->get_loop(), &(client->m_timer) ); +// printf( "timeout again\n" ); + ev_timer_stop( client->m_disp->get_loop(), &(client->m_timer) ); + ev_timer_init( &(client->m_timer), timeout_cb, timeout, timeout ); + ev_timer_start( client->m_disp->get_loop(), &(client->m_timer) ); } else { - ev_timer_init( &(client->m_timer), timeout_cb, timeout, 0.0 ); + ev_timer_init( &(client->m_timer), timeout_cb, timeout, timeout ); client->m_timer.data = client; ev_timer_start( client->m_disp->get_loop(), &(client->m_timer) ); } return 0; @@ -126,17 +178,21 @@ curl_multi_setopt( m_handle, CURLMOPT_SOCKETFUNCTION, sock_cb ); curl_multi_setopt( m_handle, CURLMOPT_SOCKETDATA, this ); curl_multi_setopt( m_handle, CURLMOPT_TIMERFUNCTION, update_timeout_cb ); curl_multi_setopt( m_handle, CURLMOPT_TIMERDATA, this ); } +void HttpClient::stop() +{ + ev_timer_stop(m_disp->get_loop(), &m_timer); +} HttpClient::~HttpClient() { - //printf("cleaning up client\n"); - this->check_handles(); curl_multi_cleanup( m_handle ); + m_handle = NULL; } + void HttpClient::check_handles() { //see: http://curl.haxx.se/lxr/source/docs/examples/ghiper.c HttpRequest *req; CURL *easy; @@ -144,21 +200,24 @@ CURLcode rc; CURLMsg *msg; do { easy = NULL; + //printf( "check handle messages\n" ); while( (msg = curl_multi_info_read( m_handle, &msgs_left )) ) { +// printf( " handle info: %d, left: %d\n", msg->msg, msgs_left ); if( msg->msg == CURLMSG_DONE ) { easy = msg->easy_handle; rc = msg->data.result; break; } } // it's unclear whether it's okay to remove a curl handle while looping through the // handles above, to avoid this we break out and loop again if( easy ) { curl_easy_getinfo( easy, CURLINFO_PRIVATE, &req ); + //printf( " req %s finished\n", req->url.c_str() ); req->finish(rc); delete req; } } while( easy ); @@ -214,10 +273,11 @@ : Request( 0, url ), m_response(new HttpResponse(url,fd)), m_handle(curl_easy_init()), m_client(dispatch.getHttpClient()) { + assert(m_client); init_curl(); } HttpRequest::HttpRequest( Dispatch &dispatch, const std::string &url ) : Request( 0, url ), @@ -239,17 +299,22 @@ } bool HttpRequest::enable() { CURLMcode rc = curl_multi_add_handle( m_client->m_handle, m_handle ); - do { - rc = curl_multi_socket_all( m_client->m_handle, &m_client->m_active ); - } while( rc == CURLM_CALL_MULTI_PERFORM ); + + if( m_client ) { + do { + rc = curl_multi_socket_all( m_client->m_handle, &m_client->m_active ); + } while( rc == CURLM_CALL_MULTI_PERFORM && m_client ); + } - m_client->check_handles(); - - return (rc == CURLM_OK); + if( m_client ) { + m_client->check_handles(); + } + multi_error_report(rc); + return (rc == CURLM_OK || rc == CURLM_CALL_MULTI_PERFORM); } void HttpRequest::finish(CURLcode rc) { curl_multi_remove_handle( m_client->m_handle, m_handle ); @@ -283,22 +348,33 @@ key_loopup["followlocation"] = CURLOPT_FOLLOWLOCATION; key_loopup["maxredirs"] = CURLOPT_MAXREDIRS; key_loopup["referer"] = CURLOPT_REFERER; key_loopup["useragent"] = CURLOPT_USERAGENT; key_loopup["cookie"] = CURLOPT_COOKIE; + key_loopup["post"] = CURLOPT_POSTFIELDS; std::map<std::string,CURLoption>::iterator loc = key_loopup.find(key); if( loc != key_loopup.end() ){ CURLoption val_type = loc->second; if( val_type >= CURLOPTTYPE_LONG && val_type < CURLOPTTYPE_OBJECTPOINT ) { long val = atoi(value.c_str()); - printf( "set opt %s : %ld\n", key.c_str(), val ); curl_easy_setopt( m_handle, val_type, val ); } else if( val_type >= CURLOPTTYPE_OBJECTPOINT && val_type < CURLOPTTYPE_FUNCTIONPOINT ) { - printf( "set opt %s : %s\n", key.c_str(), value.c_str() ); - curl_easy_setopt( m_handle, val_type, value.c_str() ); + if( key == "post" ) { + printf( "set %s as str, with value: %s\n", key.c_str(), value.c_str() ); + // enable HTTP POST + curl_easy_setopt( m_handle, CURLOPT_POST, 1 ); + // set the buffer size to copy + curl_easy_setopt( m_handle, CURLOPT_POSTFIELDSIZE, value.length() ); + // copy the buffer + curl_easy_setopt( m_handle, CURLOPT_COPYPOSTFIELDS, value.c_str() ); +// printf("setup easy handle to perform a post\n"); + } + else { + curl_easy_setopt( m_handle, val_type, value.c_str() ); + } } } } HttpResponse::HttpResponse( const std::string &url ) @@ -309,22 +385,26 @@ : Response(url), m_fd(fd) { } void HttpResponse::write( void *ptr, size_t realsize, size_t size, size_t nmemb ) { +// printf( " write: %s, %d\n", this->name.c_str(), realsize ); if( m_fd == -1 ) { body.append((const char*)ptr,realsize); } else { ::write( m_fd, ptr, realsize ); } + //fwrite( ptr, size, nmemb, stdout ); } void HttpResponse::write_header( void *ptr, size_t realsize, size_t size, size_t nmemb ) { m_header.append((const char*)ptr,realsize); + //fwrite( ptr, size, nmemb, stdout ); } void HttpResponse::finish( HttpClient *client, CURLcode rc ) { +// printf( " finish: %s\n", this->name.c_str() ); if( m_fd == -1 ) { client->m_disp->send_response( this ); } else { close( this->m_fd );