#include <ext/hash_map> #include <algorithm> #include "ev_dispatch.h" #include "ev_http.h" #include <sys/select.h> #include <errno.h> // NOTE: // this test could be written to use libev and it's event loop, however because this // library may be used in ruby it might be desireable to have a select example that could // be easy used within ruby language // // NOTE: // because this example uses a pipe there only about 400-500 conncurrent connections can be used. typedef __gnu_cxx::hash_map<int, struct ResponseDelegate* > ResponseTable; using namespace EVD; struct ResponseDelegate { ResponseDelegate(); ~ResponseDelegate(); void set_response( HttpResponse *ptr ); bool init(); void finish(); HttpResponse *m_response; int m_pfd[2]; }; struct ResponseManager { static const int READ_SIZE = 1024; ResponseManager(); ~ResponseManager(); void watch_response( ResponseDelegate *rd ); void reset(); bool process(); private: int retval; fd_set rd, wr, er; struct timeval tv; char READ_BUFFER[READ_SIZE]; int max_fd; std::vector<int> set_fds; ResponseTable response_table; }; ResponseManager::ResponseManager() { reset(); } ResponseManager::~ResponseManager() { } void ResponseManager::reset() { memset(READ_BUFFER, '\0', READ_SIZE); FD_ZERO(&rd); FD_ZERO(&wr); FD_ZERO(&er); tv.tv_sec = 1; tv.tv_usec = 0; max_fd = 0; } void ResponseManager::watch_response( ResponseDelegate *res_del ) { int fd = res_del->m_pfd[0]; FD_SET(fd, &rd); if( fd > max_fd ){ max_fd = fd; } set_fds.push_back( fd ); response_table[ fd ] = res_del; std::sort(set_fds.begin(),set_fds.end()); } bool ResponseManager::process() { int finished_count = 0; while( !set_fds.empty() ) { this->reset(); for( std::vector<int>::iterator i = set_fds.begin(); i != set_fds.end(); ++i ) { int fd = *i; FD_SET(fd, &rd); if( fd > max_fd ) { max_fd = fd; } } retval = select( max_fd+1, &rd, &wr, &er, &tv ); tv.tv_sec = 1; tv.tv_usec = 0; if( retval == 0 ) { printf("select timedout\n"); continue; } if( retval == -1 && errno == EINTR ) { continue; } if( retval < 0 ){ perror("select"); return false; } bool finished_fd = false; do { finished_fd = false; for( std::vector<int>::iterator i = set_fds.begin(); i != set_fds.end(); ++i ) { int fd = *i; if( FD_ISSET(fd,&rd) ) { ResponseTable::iterator del_loc = response_table.find(fd); ResponseDelegate *del = del_loc->second; if( del ) { retval = read(fd,READ_BUFFER,READ_SIZE); if( retval >= 0 ) { del->m_response->body.append(READ_BUFFER,retval); if( retval == 0 ){ del->finish(); response_table.erase(del_loc); set_fds.erase(i); std::sort(set_fds.begin(),set_fds.end()); std::vector<int>::iterator max_loc = std::max(set_fds.begin(),set_fds.end()); max_fd = *max_loc; FD_CLR(fd,&rd); // remove the file descriptor printf("finished: %d of %d, max_fd: %d\n", ++finished_count, set_fds.size(), max_fd ); finished_fd = true; delete del; break; } } else { perror("read"); return false; } } }// end if } // end for }while( finished_fd ); } return true; } ResponseDelegate::ResponseDelegate() :m_response(NULL) { } ResponseDelegate::~ResponseDelegate() { close(m_pfd[0]); delete m_response; } void ResponseDelegate::set_response( HttpResponse *ptr ) { m_response = ptr; } bool ResponseDelegate::init() { if( pipe(m_pfd) ) { perror("pipe"); return false; } return true; } void ResponseDelegate::finish() { printf("Response: %s, of %d bytes in %5lf seconds\n", m_response->name.c_str(), m_response->body.length(), m_response->response_time ); } // catch SIGINT to kill process static void SIGINT_handler(int sig) { exit(sig); } static bool make_request( const std::string &url, Dispatch &dispatcher, ResponseManager &rm ) { ResponseDelegate *pres = new ResponseDelegate(); if( !pres->init() ){ delete pres; return false; } // tell the request to stream the response over this file descriptor //HttpRequest *req = new HttpRequest( dispatcher, "http://127.0.0.1:4044/bytes/10/", pfd[1] ); HttpRequest *req = new HttpRequest( dispatcher, "http://127.0.0.1:4044/bytes/10/", pres->m_pfd[1] ); //HttpResponse *res = req->m_response; pres->set_response( req->m_response ); // make the request dispatcher.request( req ); rm.watch_response( pres ); return true; } static void run_tests( Dispatch &dispatcher, int count ) { time_t start_time = time(NULL); int response_count = 0; ResponseManager rm; for( int i = 0; i < 50; ++i ) { make_request( "http://127.0.0.1:4044/bytes/10/", dispatcher, rm ); } rm.process(); } int main(int argc, char **argv) { Dispatch dispatcher; if (signal(SIGINT, SIGINT_handler) == SIG_ERR) { printf("SIGINT install error\n"); exit(1); } if( !dispatcher.start() ){ fprintf( stderr, "Failed to start up dispatcher\n" ); return 1; } printf( "dispatcher thread running...\n" ); struct timeval start_time; Timer::current_time(&start_time); run_tests( dispatcher, 10 ); printf( "total time: %.5f seconds\n", Timer::elapsed_time( &start_time ) ); dispatcher.stop(); return 0; }