#include "config.h" #include "ev_http.h" namespace EVD { #define DEBUG #ifdef DEBUG #include #define ASSERT_IS_D_THRAD(id)\ {\ pthread_t tid = pthread_self();\ assert( pthread_equal(tid, id) );\ } #else #define ASSERT_IS_D_THRAD #endif static void multi_error_report(CURLMcode rc) { switch(rc) { case CURLM_CALL_MULTI_PERFORM: printf("curlm:perform: %s\n", curl_multi_strerror(rc) ); break; case CURLM_OK: // printf("curlm:ok\n"); break; case CURLM_BAD_HANDLE: printf("curlm:bad handle: %s\n", curl_multi_strerror(rc) ); break; case CURLM_BAD_EASY_HANDLE: printf("curlm:bad easy handle: %s\n", curl_multi_strerror(rc) ); break; case CURLM_OUT_OF_MEMORY: printf("curlm:oh shit out of memory: %s\n", curl_multi_strerror(rc) ); break; case CURLM_INTERNAL_ERROR: printf("curlm:oh shit internal error: %s\n", curl_multi_strerror(rc) ); break; case CURLM_BAD_SOCKET: printf("curlm:bad socket: %s\n", curl_multi_strerror(rc) ); break; case CURLM_UNKNOWN_OPTION: printf("curlm:unknown option: %s\n", curl_multi_strerror(rc) ); break; } } void HttpClient::SocketInfo::response_cb(struct ev_loop *loop, struct ev_io *w, int revents) { CURLMcode rc; HttpClient::SocketInfo *res = ((HttpClient::SocketInfo*)w->data); HttpClient *client = res->m_client; int ev_bitmask = 0; ASSERT_IS_D_THRAD(client->m_disp->get_thread_id()); do { ev_bitmask |= ((revents & EV_READ)?CURL_CSELECT_IN:0) | ((revents & EV_WRITE)?CURL_CSELECT_OUT:0); rc = curl_multi_socket_action( client->m_handle, w->fd, ev_bitmask, &(client->m_active) ); } while( rc == CURLM_CALL_MULTI_PERFORM ); client->check_handles(); } HttpClient::SocketInfo::SocketInfo( HttpClient *client, curl_socket_t sock, CURL *e, int action ) : m_action(action), m_timeout(0), m_watcher(0), m_handle(e), m_client( client ) { m_watcher = (struct ev_io*)calloc(1,sizeof(struct ev_io)); int kind = (action&CURL_POLL_IN?EV_READ:0)|(action&CURL_POLL_OUT?EV_WRITE:0); m_watcher->data = this; ev_io_init( m_watcher, response_cb, sock, kind ); ev_io_start( m_client->m_disp->get_loop(), m_watcher ); } void HttpClient::SocketInfo::set_sock( curl_socket_t sock, CURL *e, int action ) { int kind = (action&CURL_POLL_IN?EV_READ:0)|(action&CURL_POLL_OUT?EV_WRITE:0)|(action&CURL_POLL_INOUT?EV_READ|EV_WRITE:0); ev_io_stop( m_client->m_disp->get_loop(), m_watcher ); ev_io_set( m_watcher, sock, kind ); ev_io_start( m_client->m_disp->get_loop(), m_watcher ); } void HttpClient::SocketInfo::finish() { // printf(" sock finish: 0x%X\n", this ); ev_io_stop( m_client->m_disp->get_loop(), m_watcher ); } HttpClient::SocketInfo::~SocketInfo() { free(m_watcher); } // CURLMOPT_SOCKETFUNCTION int HttpClient::sock_cb(CURL *e, curl_socket_t sock, int action, void *cbp, void *sockp) { HttpClient *client = (HttpClient*)cbp; HttpClient::SocketInfo *sockinfo = (HttpClient::SocketInfo *)sockp; ASSERT_IS_D_THRAD(client->m_disp->get_thread_id()); if( !sockinfo ) { sockinfo = new HttpClient::SocketInfo( client, sock, e, action ); curl_multi_assign(client->m_handle, sock, sockinfo); // printf( " new sock: 0x%X\n", sockinfo ); } switch( action ){ case CURL_POLL_REMOVE: // (4) unregister // make sure we clear it out //printf( " del sock: 0x%X\n", sockinfo ); curl_multi_assign(client->m_handle, sock, NULL); sockinfo->finish(); delete sockinfo; client->check_handles(); break; case CURL_POLL_NONE: // (0) register, not interested in readiness (yet) // printf("none\n"); break; case CURL_POLL_IN: // (1) register, interested in read readiness // printf("in\n"); case CURL_POLL_OUT: // (2) register, interested in write readiness // printf("out\n"); case CURL_POLL_INOUT: // (3) register, interested in both read and write readiness // printf("inout\n"); default: sockinfo->set_sock( sock, e, action ); client->check_handles(); break; } return 0; } // called on the event loop thread void HttpClient::timeout_cb(struct ev_loop *loop, struct ev_timer *w, int revents) { CURLMcode rc; HttpClient *client = (HttpClient*)w->data; // printf( "timeout\n" ); do { rc = curl_multi_socket(client->m_handle, CURL_SOCKET_TIMEOUT, &(client->m_active)); } while( rc == CURLM_CALL_MULTI_PERFORM ); client->check_handles(); } int HttpClient::update_timeout_cb(CURLM *multi, long timeout_ms, void *userp) { //struct timeval timeout; //timeout.tv_sec = timeout_ms/1000; //timeout.tv_usec = (timeout_ms%1000)*1000; HttpClient *client = (HttpClient*)userp; // update the timer ev_tstamp timeout = /*ev_now(client->m_disp->get_loop()) + */(timeout_ms * 0.001); if( timeout_ms == 0 ) { timeout = 0.1; // enforce always timeout } if( timeout < 0.05 ) { timeout = 0.05; // never let it go below this } // printf( "timeout set %lf\n", timeout ); if( client->m_timer_set ) { // printf( "timeout again\n" ); ev_timer_stop( client->m_disp->get_loop(), &(client->m_timer) ); ev_timer_init( &(client->m_timer), timeout_cb, timeout, timeout ); ev_timer_start( client->m_disp->get_loop(), &(client->m_timer) ); } else { //printf( "start up http client timer\n" ); ev_timer_init( &(client->m_timer), timeout_cb, timeout, timeout ); client->m_timer.data = client; ev_timer_start( client->m_disp->get_loop(), &(client->m_timer) ); client->m_timer_set = true; } return 0; } HttpClient::HttpClient( Dispatch *disp ) : m_active(0), m_handle( curl_multi_init() ), m_disp(disp), m_timer_set(false) { memset(&m_timer, 0, sizeof(struct ev_timer)); curl_multi_setopt( m_handle, CURLMOPT_SOCKETFUNCTION, sock_cb ); curl_multi_setopt( m_handle, CURLMOPT_SOCKETDATA, this ); curl_multi_setopt( m_handle, CURLMOPT_TIMERFUNCTION, update_timeout_cb ); curl_multi_setopt( m_handle, CURLMOPT_TIMERDATA, this ); } void HttpClient::stop() { ev_timer_stop(m_disp->get_loop(), &m_timer); } HttpClient::~HttpClient() { curl_multi_cleanup( m_handle ); m_handle = NULL; } void HttpClient::check_handles() { //see: http://curl.haxx.se/lxr/source/docs/examples/ghiper.c HttpRequest *req; CURL *easy; int msgs_left; CURLcode rc; CURLMsg *msg; do { easy = NULL; //printf( "check handle messages\n" ); while( (msg = curl_multi_info_read( m_handle, &msgs_left )) ) { // printf( " handle info: %d, left: %d\n", msg->msg, msgs_left ); if( msg->msg == CURLMSG_DONE ) { easy = msg->easy_handle; rc = msg->data.result; break; } } // it's unclear whether it's okay to remove a curl handle while looping through the // handles above, to avoid this we break out and loop again if( easy ) { curl_easy_getinfo( easy, CURLINFO_PRIVATE, &req ); //printf( " req %s finished\n", req->url.c_str() ); req->finish(rc); delete req; } } while( easy ); if( msgs_left == 0 && m_timer_set ) { // stop the event timer //printf( "stop http client timer, no pending requests\n" ); ev_timer_stop( m_disp->get_loop(), &m_timer ); m_timer_set = false; } } // CURLOPT_WRITEFUNCTION size_t HttpRequest::write_cb(void *ptr, size_t size, size_t nmemb, void *data) { size_t realsize = size * nmemb; HttpRequest *req = (HttpRequest *)data; req->m_response->write( ptr, realsize, size, nmemb ); return realsize; } // CURLOPT_HEADERFUNCTION size_t HttpRequest::header_write_cb(void *ptr, size_t size, size_t nmemb, void *data) { size_t realsize = size * nmemb; HttpRequest *req = (HttpRequest *)data; req->m_response->write_header( ptr, realsize, size, nmemb ); return realsize; } // CURLOPT_PROGRESSFUNCTION int HttpRequest::prog_cb(void *p, double dltotal, double dlnow, double ult, double uln) { HttpRequest *req = (HttpRequest *)p; //fprintf(stderr, "Progress: %s (%g/%g)\n", req->url.c_str(), dlnow, dltotal); return 0; } void HttpRequest::init_curl() { curl_easy_setopt(m_handle, CURLOPT_URL, url.c_str()); curl_easy_setopt(m_handle, CURLOPT_HEADERFUNCTION, header_write_cb); curl_easy_setopt(m_handle, CURLOPT_HEADERDATA, this); curl_easy_setopt(m_handle, CURLOPT_WRITEFUNCTION, write_cb); curl_easy_setopt(m_handle, CURLOPT_WRITEDATA, this); curl_easy_setopt(m_handle, CURLOPT_VERBOSE, 0); curl_easy_setopt(m_handle, CURLOPT_ERRORBUFFER, m_error); curl_easy_setopt(m_handle, CURLOPT_PRIVATE, this); curl_easy_setopt(m_handle, CURLOPT_NOPROGRESS, 0); curl_easy_setopt(m_handle, CURLOPT_PROGRESSFUNCTION, prog_cb); curl_easy_setopt(m_handle, CURLOPT_PROGRESSDATA, this); } HttpRequest::HttpRequest( Dispatch &dispatch, const std::string &url, int fd ) : Request( 0, url ), m_response(new HttpResponse(url,fd)), m_handle(curl_easy_init()), m_client(dispatch.getHttpClient()) { assert(m_client); init_curl(); } HttpRequest::HttpRequest( Dispatch &dispatch, const std::string &url ) : Request( 0, url ), m_response(new HttpResponse(url)), m_handle(curl_easy_init()), m_client(dispatch.getHttpClient()) { init_curl(); } HttpRequest::~HttpRequest() { curl_easy_cleanup( m_handle ); #ifdef DEBUG m_response = NULL; m_handle = NULL; m_client = NULL; #endif } bool HttpRequest::enable() { CURLMcode rc = curl_multi_add_handle( m_client->m_handle, m_handle ); if( m_client ) { do { rc = curl_multi_socket_all( m_client->m_handle, &m_client->m_active ); } while( rc == CURLM_CALL_MULTI_PERFORM && m_client ); } // XXX: it's really strange that m_client could be 0x0 all of a sudden here... but that's currently what must be happening // in order for the final return code to be CURLM_CALL_MULTI_PERFORM. if( m_client ) { m_client->check_handles(); } multi_error_report(rc); return (rc == CURLM_OK || rc == CURLM_CALL_MULTI_PERFORM); } void HttpRequest::finish(CURLcode rc) { curl_multi_remove_handle( m_client->m_handle, m_handle ); // add the response object here to the responses queue in the dispatcher? // signaling to any waiting clients that their response is available m_response->response_time = Timer::elapsed_time( &(this->start_time) ); m_response->finish( m_client, rc ); //fprintf( stderr, "DONE: (%s/%s) => (%d), body(%d): '%s'\n", url.c_str(), url.c_str(), rc, (int)m_response, m_response->body.c_str() ); } void HttpRequest::set_key( request_t key ) { Request::set_key(key); m_response->id = key; } void HttpRequest::set_opt( const std::string &key, const std::string &value ) { Request::set_opt(key,value); // convert the key into a curl value // #define CURLOPTTYPE_LONG 0 // #define CURLOPTTYPE_OBJECTPOINT 10000 // #define CURLOPTTYPE_FUNCTIONPOINT 20000 // #define CURLOPTTYPE_OFF_T 30000 std::map key_loopup; // TODO: define this once for each resquest? key_loopup["port"] = CURLOPT_PORT; key_loopup["autoreferer"] = CURLOPT_AUTOREFERER; key_loopup["followlocation"] = CURLOPT_FOLLOWLOCATION; key_loopup["maxredirs"] = CURLOPT_MAXREDIRS; key_loopup["referer"] = CURLOPT_REFERER; key_loopup["useragent"] = CURLOPT_USERAGENT; key_loopup["cookie"] = CURLOPT_COOKIE; key_loopup["post"] = CURLOPT_POSTFIELDS; std::map::iterator loc = key_loopup.find(key); if( loc != key_loopup.end() ){ CURLoption val_type = loc->second; if( val_type >= CURLOPTTYPE_LONG && val_type < CURLOPTTYPE_OBJECTPOINT ) { long val = atoi(value.c_str()); curl_easy_setopt( m_handle, val_type, val ); } else if( val_type >= CURLOPTTYPE_OBJECTPOINT && val_type < CURLOPTTYPE_FUNCTIONPOINT ) { if( key == "post" ) { printf( "set %s as str, with value: %s\n", key.c_str(), value.c_str() ); // enable HTTP POST curl_easy_setopt( m_handle, CURLOPT_POST, 1 ); // set the buffer size to copy curl_easy_setopt( m_handle, CURLOPT_POSTFIELDSIZE, value.length() ); // copy the buffer curl_easy_setopt( m_handle, CURLOPT_COPYPOSTFIELDS, value.c_str() ); // printf("setup easy handle to perform a post\n"); } else { curl_easy_setopt( m_handle, val_type, value.c_str() ); } } } } HttpResponse::HttpResponse( const std::string &url ) : Response(url), m_fd(-1) { } HttpResponse::HttpResponse( const std::string &url, int fd ) : Response(url), m_fd(fd) { } void HttpResponse::write( void *ptr, size_t realsize, size_t size, size_t nmemb ) { // printf( " write: %s, %d\n", this->name.c_str(), realsize ); if( m_fd == -1 ) { body.append((const char*)ptr,realsize); } else { ::write( m_fd, ptr, realsize ); } //fwrite( ptr, size, nmemb, stdout ); } void HttpResponse::write_header( void *ptr, size_t realsize, size_t size, size_t nmemb ) { m_header.append((const char*)ptr,realsize); //fwrite( ptr, size, nmemb, stdout ); } void HttpResponse::finish( HttpClient *client, CURLcode rc ) { // printf( " finish: %s\n", this->name.c_str() ); if( m_fd == -1 ) { client->m_disp->send_response( this ); } else { close( this->m_fd ); } } }