#include <ext/hash_map>
#include <algorithm>
#include "ev_dispatch.h"
#include "ev_http.h"
#include <sys/select.h>
#include <errno.h>

// NOTE:
// this test could be written to use libev and it's event loop, however because this
// library may be used in ruby it might be desireable to have a select example that could 
// be easy used within ruby language
//
// NOTE:
// because this example uses a pipe there only about 400-500 conncurrent connections can be used.

typedef __gnu_cxx::hash_map<int, struct ResponseDelegate* > ResponseTable;

using namespace EVD;

struct ResponseDelegate {

  ResponseDelegate();
  ~ResponseDelegate();

  void set_response( HttpResponse *ptr );

  bool init();

  void finish();

  HttpResponse *m_response;
  int m_pfd[2];

};

struct ResponseManager {
  static const int READ_SIZE = 1024;
  ResponseManager();
  ~ResponseManager();

  void watch_response( ResponseDelegate *rd );

  void reset();

  bool process();
private:
  int retval;
  fd_set rd, wr, er;
  struct timeval tv;
  char READ_BUFFER[READ_SIZE];
  int max_fd;
  std::vector<int> set_fds;
  ResponseTable response_table;
};


ResponseManager::ResponseManager()
{
  reset();
}
ResponseManager::~ResponseManager()
{
}
void ResponseManager::reset()
{
  memset(READ_BUFFER, '\0', READ_SIZE);
  FD_ZERO(&rd);
  FD_ZERO(&wr);
  FD_ZERO(&er);
  tv.tv_sec = 1;
  tv.tv_usec = 0;
  max_fd = 0;
}

void ResponseManager::watch_response( ResponseDelegate *res_del )
{
  int fd = res_del->m_pfd[0];
  FD_SET(fd, &rd);
  if( fd > max_fd ){ max_fd = fd; }
  set_fds.push_back( fd );
  response_table[ fd ] = res_del;
  std::sort(set_fds.begin(),set_fds.end());
}

bool ResponseManager::process()
{
  int finished_count = 0;

  while( !set_fds.empty() ) {
    this->reset();
    for( std::vector<int>::iterator i = set_fds.begin(); i != set_fds.end(); ++i ) {
      int fd = *i;
      FD_SET(fd, &rd);
      if( fd > max_fd ) { max_fd = fd; }
    }

    retval = select( max_fd+1, &rd, &wr, &er, &tv );

    tv.tv_sec = 1;
    tv.tv_usec = 0;

    if( retval == 0 ) { printf("select timedout\n"); continue; }
    if( retval == -1 && errno == EINTR ) { continue; }
    if( retval < 0 ){ perror("select"); return false; }

    bool finished_fd = false;
    do {
      finished_fd = false;
      for( std::vector<int>::iterator i = set_fds.begin(); i != set_fds.end(); ++i ) {
        int fd = *i;

        if( FD_ISSET(fd,&rd) ) {
          ResponseTable::iterator del_loc = response_table.find(fd);
          ResponseDelegate *del = del_loc->second;
          if( del ) {
            retval = read(fd,READ_BUFFER,READ_SIZE);
            if( retval >= 0 ) {
              del->m_response->body.append(READ_BUFFER,retval);
              if( retval == 0 ){
                del->finish();
                response_table.erase(del_loc);
                set_fds.erase(i);
                std::sort(set_fds.begin(),set_fds.end());
                std::vector<int>::iterator max_loc = std::max(set_fds.begin(),set_fds.end());
                max_fd = *max_loc;
                FD_CLR(fd,&rd); // remove the file descriptor
                printf("finished: %d of %d, max_fd: %d\n", ++finished_count, set_fds.size(), max_fd );
                finished_fd = true;
                delete del;
                break;
              }
            }
            else {
              perror("read");
              return false;
            }
          }
        }// end if
        
      } // end for

    }while( finished_fd );
  }

  return true;
}

ResponseDelegate::ResponseDelegate() :m_response(NULL)
{
}

ResponseDelegate::~ResponseDelegate()
{
  close(m_pfd[0]);
  delete m_response;
}

void ResponseDelegate::set_response( HttpResponse *ptr )
{
  m_response = ptr;
}

bool ResponseDelegate::init()
{
  if( pipe(m_pfd) ) { perror("pipe"); return false; }
  return true;
}

void ResponseDelegate::finish()
{
  printf("Response: %s, of %d bytes  in %5lf seconds\n", m_response->name.c_str(), m_response->body.length(), m_response->response_time );
}


// catch SIGINT to kill process 
static void  SIGINT_handler(int sig)
{
  exit(sig);
}

static bool make_request( const std::string &url, Dispatch &dispatcher, ResponseManager &rm )
{
  ResponseDelegate *pres = new ResponseDelegate();

  if( !pres->init() ){ delete pres; return false; }

  // tell the request to stream the response over this file descriptor
  //HttpRequest *req = new HttpRequest( dispatcher, "http://127.0.0.1:4044/bytes/10/", pfd[1] );
  HttpRequest *req = new HttpRequest( dispatcher, "http://127.0.0.1:4044/bytes/10/", pres->m_pfd[1] );
  //HttpResponse *res = req->m_response;
  pres->set_response( req->m_response );

  // make the request
  dispatcher.request( req );

  rm.watch_response( pres );

  return true;
}

static void run_tests( Dispatch &dispatcher, int count )
{
  time_t start_time = time(NULL);
  int response_count = 0;

  ResponseManager rm;

  for( int i = 0; i < 50; ++i ) {
    make_request( "http://127.0.0.1:4044/bytes/10/", dispatcher, rm );
  }
  
  rm.process();
}

int main(int argc, char **argv)
{
  Dispatch dispatcher;

  if (signal(SIGINT, SIGINT_handler) == SIG_ERR) {
    printf("SIGINT install error\n");
    exit(1);
  }

  if( !dispatcher.start() ){
    fprintf( stderr, "Failed to start up dispatcher\n" );
    return 1;
  }

  printf( "dispatcher thread running...\n" );

  struct timeval start_time;
  Timer::current_time(&start_time);

  run_tests( dispatcher, 10 );

  printf( "total time: %.5f seconds\n", Timer::elapsed_time( &start_time ) );

  dispatcher.stop();

  return 0;
}