#include "config.h" #include "ev_http.h" namespace EVD { #define DEBUG #ifdef DEBUG #include #define ASSERT_IS_D_THRAD(id)\ {\ pthread_t tid = pthread_self();\ assert( tid == id );\ } #else #define ASSERT_IS_D_THRAD #endif void HttpClient::SocketInfo::response_cb(struct ev_loop *loop, struct ev_io *w, int revents) { CURLMcode rc; HttpClient::SocketInfo *res = ((HttpClient::SocketInfo*)w->data); HttpClient *client = res->m_client; int ev_bitmask = 0; ASSERT_IS_D_THRAD(client->m_disp->get_thread_id()); do { ev_bitmask |= ((revents & EV_READ)?CURL_CSELECT_IN:0) | ((revents & EV_WRITE)?CURL_CSELECT_OUT:0); rc = curl_multi_socket_action( client->m_handle, w->fd, ev_bitmask, &(client->m_active) ); } while( rc == CURLM_CALL_MULTI_PERFORM ); client->check_handles(); } HttpClient::SocketInfo::SocketInfo( HttpClient *client, curl_socket_t sock, CURL *e, int action ) : m_action(action), m_timeout(0), m_watcher(0), m_handle(e), m_client( client ) { m_watcher = (struct ev_io*)calloc(1,sizeof(struct ev_io)); int kind = (action&CURL_POLL_IN?EV_READ:0)|(action&CURL_POLL_OUT?EV_WRITE:0); m_watcher->data = this; ev_io_init( m_watcher, response_cb, sock, kind ); ev_io_start( m_client->m_disp->get_loop(), m_watcher ); } void HttpClient::SocketInfo::set_sock( curl_socket_t sock, CURL *e, int action ) { int kind = (action&CURL_POLL_IN?EV_READ:0)|(action&CURL_POLL_OUT?EV_WRITE:0); ev_io_set( m_watcher, sock, kind ); } void HttpClient::SocketInfo::finish() { ev_io_stop( m_client->m_disp->get_loop(), m_watcher ); } HttpClient::SocketInfo::~SocketInfo() { free(m_watcher); } // CURLMOPT_SOCKETFUNCTION int HttpClient::sock_cb(CURL *e, curl_socket_t sock, int action, void *cbp, void *sockp) { HttpClient *client = (HttpClient*)cbp; HttpClient::SocketInfo *sockinfo = (HttpClient::SocketInfo *)sockp; ASSERT_IS_D_THRAD(client->m_disp->get_thread_id()); if( !sockinfo ) { sockinfo = new HttpClient::SocketInfo( client, sock, e, action ); curl_multi_assign(client->m_handle, sock, sockinfo); } switch( action ){ case CURL_POLL_REMOVE: // (4) unregister // make sure we clear it out curl_multi_assign(client->m_handle, sock, NULL); sockinfo->finish(); delete sockinfo; break; case CURL_POLL_NONE: // (0) register, not interested in readiness (yet) case CURL_POLL_IN: // (1) register, interested in read readiness case CURL_POLL_OUT: // (2) register, interested in write readiness case CURL_POLL_INOUT: // (3) register, interested in both read and write readiness default: sockinfo->set_sock( sock, e, action ); client->check_handles(); break; } return 0; } // called on the event loop thread void HttpClient::timeout_cb(struct ev_loop *loop, struct ev_timer *w, int revents) { CURLMcode rc; HttpClient *client = (HttpClient*)w->data; do { rc = curl_multi_socket(client->m_handle, CURL_SOCKET_TIMEOUT, &(client->m_active)); } while( rc == CURLM_CALL_MULTI_PERFORM ); client->check_handles(); } int HttpClient::update_timeout_cb(CURLM *multi, long timeout_ms, void *userp) { //struct timeval timeout; //timeout.tv_sec = timeout_ms/1000; //timeout.tv_usec = (timeout_ms%1000)*1000; HttpClient *client = (HttpClient*)userp; // update the timer ev_tstamp timeout = ev_now(client->m_disp->get_loop()) + (timeout_ms * 0.001); if( client->m_timer_set ) { ev_timer_set( &(client->m_timer), timeout, 0.0 ); ev_timer_again( client->m_disp->get_loop(), &(client->m_timer) ); } else { ev_timer_init( &(client->m_timer), timeout_cb, timeout, 0.0 ); client->m_timer.data = client; ev_timer_start( client->m_disp->get_loop(), &(client->m_timer) ); } return 0; } HttpClient::HttpClient( Dispatch *disp ) : m_active(0), m_handle( curl_multi_init() ), m_disp(disp), m_timer_set(false) { memset(&m_timer, 0, sizeof(struct ev_timer)); curl_multi_setopt( m_handle, CURLMOPT_SOCKETFUNCTION, sock_cb ); curl_multi_setopt( m_handle, CURLMOPT_SOCKETDATA, this ); curl_multi_setopt( m_handle, CURLMOPT_TIMERFUNCTION, update_timeout_cb ); curl_multi_setopt( m_handle, CURLMOPT_TIMERDATA, this ); } HttpClient::~HttpClient() { //printf("cleaning up client\n"); this->check_handles(); curl_multi_cleanup( m_handle ); } void HttpClient::check_handles() { //see: http://curl.haxx.se/lxr/source/docs/examples/ghiper.c HttpRequest *req; CURL *easy; int msgs_left; CURLcode rc; CURLMsg *msg; do { easy = NULL; while( (msg = curl_multi_info_read( m_handle, &msgs_left )) ) { if( msg->msg == CURLMSG_DONE ) { easy = msg->easy_handle; rc = msg->data.result; break; } } // it's unclear whether it's okay to remove a curl handle while looping through the // handles above, to avoid this we break out and loop again if( easy ) { curl_easy_getinfo( easy, CURLINFO_PRIVATE, &req ); req->finish(rc); delete req; } } while( easy ); } // CURLOPT_WRITEFUNCTION size_t HttpRequest::write_cb(void *ptr, size_t size, size_t nmemb, void *data) { size_t realsize = size * nmemb; HttpRequest *req = (HttpRequest *)data; req->response->body.append((const char*)ptr,realsize); //fprintf(stderr, "Write: %s (%lu) => (%d)%s\n", req->url.c_str(), realsize, (int)req->response, req->response->body.c_str() ); return realsize; } // CURLOPT_PROGRESSFUNCTION int HttpRequest::prog_cb(void *p, double dltotal, double dlnow, double ult, double uln) { HttpRequest *req = (HttpRequest *)p; //fprintf(stderr, "Progress: %s (%g/%g)\n", req->url.c_str(), dlnow, dltotal); return 0; } HttpRequest::HttpRequest( HttpClient* client, request_t k, const std::string &url ) : Request( k, url ), response(NULL), m_handle(curl_easy_init()), m_client(client) { memset(error,'\0', CURL_ERROR_SIZE); curl_easy_setopt(m_handle, CURLOPT_URL, url.c_str()); curl_easy_setopt(m_handle, CURLOPT_WRITEFUNCTION, write_cb); curl_easy_setopt(m_handle, CURLOPT_WRITEDATA, this); curl_easy_setopt(m_handle, CURLOPT_VERBOSE, 0); curl_easy_setopt(m_handle, CURLOPT_ERRORBUFFER, error); curl_easy_setopt(m_handle, CURLOPT_PRIVATE, this); curl_easy_setopt(m_handle, CURLOPT_NOPROGRESS, 0); curl_easy_setopt(m_handle, CURLOPT_PROGRESSFUNCTION, prog_cb); curl_easy_setopt(m_handle, CURLOPT_PROGRESSDATA, this); this->response = new Response(); this->response->name = url; this->response->id = k; } HttpRequest::~HttpRequest() { curl_easy_cleanup( m_handle ); #ifdef DEBUG response = NULL; m_handle = NULL; m_client = NULL; #endif } bool HttpRequest::enable() { CURLMcode rc = curl_multi_add_handle( m_client->m_handle, m_handle ); do { rc = curl_multi_socket_all( m_client->m_handle, &m_client->m_active ); } while( rc == CURLM_CALL_MULTI_PERFORM ); m_client->check_handles(); return (rc == CURLM_OK); } void HttpRequest::finish(CURLcode rc) { curl_multi_remove_handle( m_client->m_handle, m_handle ); // add the response object here to the responses queue in the dispatcher? // signaling to any waiting clients that their response is available this->response->response_time = difftime( time(NULL), this->start_time ); m_client->m_disp->send_response( this->response ); //fprintf( stderr, "DONE: (%s/%s) => (%d), body(%d): '%s'\n", url.c_str(), url.c_str(), rc, (int)this->response, this->response->body.c_str() ); } }