src/ebb.c in ebb-0.0.4 vs src/ebb.c in ebb-0.1.0

- old
+ new

@@ -27,11 +27,10 @@ #include "ebb.h" #define min(a,b) (a < b ? a : b) #define ramp(a) (a > 0 ? a : 0) -static int server_socket(const int port); static int server_socket_unix(const char *path, int access_mask); void env_add(ebb_client *client, const char *field, int flen, const char *value, int vlen) { if(client->env_size >= EBB_MAX_ENV) { @@ -59,10 +58,11 @@ client->env[client->env_size].value = value; client->env[client->env_size].value_length = vlen; client->env_size += 1; } + void http_field_cb(void *data, const char *field, size_t flen, const char *value, size_t vlen) { ebb_client *client = (ebb_client*)(data); assert(field != NULL); assert(value != NULL); @@ -114,43 +114,38 @@ void content_length_cb(void *data, const char *at, size_t length) { ebb_client *client = (ebb_client*)(data); env_add_const(client, EBB_CONTENT_LENGTH, at, length); - - /* i hate c. */ - char buf[20]; - strncpy(buf, at, length); - buf[length] = '\0'; - client->content_length = atoi(buf); + /* atoi_length - why isn't this in the statndard library? i hate c */ + assert(client->content_length == 0); + int i, mult; + for(mult=1, i=length-1; i>=0; i--, mult*=10) + client->content_length += (at[i] - '0') * mult; } -const char* localhost_str = "0.0.0.0"; -void dispatch(ebb_client *client) +static void dispatch(ebb_client *client) { ebb_server *server = client->server; if(client->open == FALSE) return; /* Set the env variables */ if(server->port) { - env_add_const(client, EBB_SERVER_NAME - , localhost_str - , 7 - ); env_add_const(client, EBB_SERVER_PORT , server->port , strlen(server->port) ); } + client->in_use = TRUE; server->request_cb(client, server->request_cb_data); } -void on_timeout(struct ev_loop *loop, ev_timer *watcher, int revents) +static void on_timeout(struct ev_loop *loop, ev_timer *watcher, int revents) { ebb_client *client = (ebb_client*)(watcher->data); assert(client->server->loop == loop); assert(&(client->timeout_watcher) == watcher); @@ -162,11 +157,11 @@ } #define client_finished_parsing http_parser_is_finished(&client->parser) #define total_request_size (client->content_length + client->parser.nread) -void* read_body_into_file(void *_client) +static void* read_body_into_file(void *_client) { ebb_client *client = (ebb_client*)_client; static unsigned int id; FILE *tmpfile; @@ -197,12 +192,10 @@ return NULL; } written += r; } - // g_debug("wrote request header to file. written: %d, content_length: %d", written, client->content_length); - int bufsize = 5*1024; char buffer[bufsize]; size_t received; while(written < client->content_length) { received = recv(client->fd @@ -234,26 +227,27 @@ ebb_client_close(client); return NULL; } -void on_readable(struct ev_loop *loop, ev_io *watcher, int revents) +static void on_client_readable(struct ev_loop *loop, ev_io *watcher, int revents) { ebb_client *client = (ebb_client*)(watcher->data); + assert(client->in_use == FALSE); assert(client->open); assert(client->server->open); assert(client->server->loop == loop); assert(&client->read_watcher == watcher); ssize_t read = recv( client->fd , client->request_buffer + client->read , EBB_BUFFERSIZE - client->read , 0 ); - if(read < 0) goto error; /* XXX is this the right action to take for read==0 ? */ - if(read == 0) return; + if(read < 0) goto error; + if(read == 0) goto error; /* XXX is this the right action to take for read==0 ? */ client->read += read; ev_timer_again(loop, &client->timeout_watcher); if(client->read == EBB_BUFFERSIZE) goto error; @@ -286,46 +280,14 @@ error: if(read < 0) g_message("Error recving data: %s", strerror(errno)); ebb_client_close(client); } -void on_request(struct ev_loop *loop, ev_io *watcher, int revents) +static client_init(ebb_server *server, ebb_client *client) { - ebb_server *server = (ebb_server*)(watcher->data); - assert(server->open); - assert(server->loop == loop); - assert(&server->request_watcher == watcher); - - if(EV_ERROR & revents) { - g_message("on_request() got error event, closing server."); - ebb_server_unlisten(server); - return; - } - /* Now we're going to initialize the client - * and set up her callbacks for read and write - * the client won't get passed back to the user, however, - * until the request is complete and parsed. - */ - int i; - ebb_client *client; - /* Get next availible peer */ - for(i=0; i < EBB_MAX_CLIENTS; i++) - if(!server->clients[i].open) { - client = &(server->clients[i]); - break; - } - if(client == NULL) { - g_message("Too many peers. Refusing connections."); - return; - } - + assert(client->in_use == FALSE); #ifdef DEBUG - int count = 0; - for(i = 0; i < EBB_MAX_CLIENTS; i++) - if(server->clients[i].open) count += 1; - g_debug("%d open connections", count); - /* does ragel fuck up if request buffer isn't null? */ for(i=0; i< EBB_BUFFERSIZE; i++) client->request_buffer[i] = 'A'; #endif @@ -333,11 +295,16 @@ client->server = server; /* DO SOCKET STUFF */ socklen_t len; client->fd = accept(server->fd, (struct sockaddr*)&(server->sockaddr), &len); - assert(client->fd >= 0); + if(client->fd < 0) { + perror("accept()"); + client->open = FALSE; + return; + } + int flags = fcntl(client->fd, F_GETFL, 0); assert(0 <= fcntl(client->fd, F_SETFL, flags | O_NONBLOCK)); /* INITIALIZE http_parser */ http_parser_init(&(client->parser)); @@ -350,32 +317,75 @@ client->parser.query_string = query_string_cb; client->parser.http_version = http_version_cb; client->parser.content_length = content_length_cb; /* OTHER */ - client->env_size = 0; client->read = client->nread_from_body = 0; client->response_buffer->len = 0; /* see note in ebb_client_close */ client->content_length = 0; + if(client->request_buffer == NULL) { + client->request_buffer = (char*)malloc(EBB_BUFFERSIZE); + } - client->status_sent = FALSE; - client->headers_sent = FALSE; - client->body_sent = FALSE; + client->status_written = FALSE; + client->headers_written = FALSE; + client->body_written = FALSE; + client->began_transmission = FALSE; /* SETUP READ AND TIMEOUT WATCHERS */ client->read_watcher.data = client; - ev_init(&client->read_watcher, on_readable); + ev_init(&client->read_watcher, on_client_readable); ev_io_set(&client->read_watcher, client->fd, EV_READ | EV_ERROR); ev_io_start(server->loop, &client->read_watcher); client->timeout_watcher.data = client; ev_timer_init(&client->timeout_watcher, on_timeout, EBB_TIMEOUT, EBB_TIMEOUT); ev_timer_start(server->loop, &client->timeout_watcher); } +static void on_request(struct ev_loop *loop, ev_io *watcher, int revents) +{ + ebb_server *server = (ebb_server*)(watcher->data); + assert(server->open); + assert(server->loop == loop); + assert(&server->request_watcher == watcher); + + if(EV_ERROR & revents) { + g_message("on_request() got error event, closing server."); + ebb_server_unlisten(server); + return; + } + /* Now we're going to initialize the client + * and set up her callbacks for read and write + * the client won't get passed back to the user, however, + * until the request is complete and parsed. + */ + int i; + ebb_client *client; + /* Get next availible peer */ + for(i=0; i < EBB_MAX_CLIENTS; i++) + if(!server->clients[i].in_use && !server->clients[i].open) { + client = &(server->clients[i]); + break; + } + if(client == NULL) { + g_message("Too many peers. Refusing connections."); + return; + } + +#ifdef DEBUG + int count = 0; + for(i = 0; i < EBB_MAX_CLIENTS; i++) + if(server->clients[i].open) count += 1; + g_debug("%d open connections", count); +#endif + + client_init(server, client); +} + ebb_server* ebb_server_alloc() { ebb_server *server = g_new0(ebb_server, 1); return server; } @@ -386,12 +396,16 @@ , ebb_request_cb request_cb , void *request_cb_data ) { int i; - for(i=0; i < EBB_MAX_CLIENTS; i++) + for(i=0; i < EBB_MAX_CLIENTS; i++) { + server->clients[i].request_buffer = NULL; server->clients[i].response_buffer = g_string_new(""); + server->clients[i].open = FALSE; + server->clients[i].in_use = FALSE; + } server->request_cb = request_cb; server->request_cb_data = request_cb_data; server->loop = loop; server->open = FALSE; @@ -429,45 +443,87 @@ unlink(server->socketpath); server->open = FALSE; } } - -void ebb_server_listen(ebb_server *server) +int ebb_server_listen_on_port(ebb_server *server, const int port) { - int r = listen(server->fd, EBB_MAX_CLIENTS); - assert(r >= 0); + int sfd = -1; + struct linger ling = {0, 0}; + struct sockaddr_in addr; + int flags = 1; + + if ((sfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) { + perror("socket()"); + goto error; + } + + flags = fcntl(sfd, F_GETFL, 0); + if(fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) { + perror("setting O_NONBLOCK"); + goto error; + } + + flags = 1; + setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags)); + setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags)); + setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling)); + setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags)); + + /* + * the memset call clears nonstandard fields in some impementations + * that otherwise mess things up. + */ + memset(&addr, 0, sizeof(addr)); + + addr.sin_family = AF_INET; + addr.sin_port = htons(port); + addr.sin_addr.s_addr = htonl(INADDR_ANY); + + if (bind(sfd, (struct sockaddr *)&addr, sizeof(addr)) < 0) { + perror("bind()"); + goto error; + } + if (listen(sfd, EBB_MAX_CLIENTS) < 0) { + perror("listen()"); + goto error; + } + server->fd = sfd; + server->port = malloc(sizeof(char)*8); /* for easy access to the port */ + sprintf(server->port, "%d", port); assert(server->open == FALSE); server->open = TRUE; server->request_watcher.data = server; ev_init (&server->request_watcher, on_request); ev_io_set (&server->request_watcher, server->fd, EV_READ | EV_ERROR); ev_io_start (server->loop, &server->request_watcher); + + return server->fd; +error: + if(sfd > 0) close(sfd); + return -1; } -int ebb_server_listen_on_port(ebb_server *server, const int port) +int ebb_server_listen_on_socket(ebb_server *server, const char *socketpath) { - int fd = server_socket(port); - if(fd < 0) return 0; - server->port = malloc(sizeof(char)*8); /* for easy access to the port */ - sprintf(server->port, "%d", port); - server->fd = fd; - ebb_server_listen(server); - return fd; + // int fd = server_socket_unix(socketpath, 0755); + // if(fd < 0) return 0; + // server->socketpath = strdup(socketpath); + // server->fd = fd; + // server_listen(server); + // return fd; } -int ebb_server_listen_on_socket(ebb_server *server, const char *socketpath) +void ebb_client_release(ebb_client *client) { - int fd = server_socket_unix(socketpath, 0755); - if(fd < 0) return 0; - server->socketpath = strdup(socketpath); - server->fd = fd; - ebb_server_listen(server); - return fd; + assert(client->in_use); + client->in_use = FALSE; + if(client->written == client->response_buffer->len) + ebb_client_close(client); } void ebb_client_close(ebb_client *client) { @@ -492,17 +548,22 @@ client->open = FALSE; } } -void on_client_writable(struct ev_loop *loop, ev_io *watcher, int revents) +static void on_client_writable(struct ev_loop *loop, ev_io *watcher, int revents) { ebb_client *client = (ebb_client*)(watcher->data); ssize_t sent; + assert(client->status_written); + assert(client->headers_written); + assert(client->began_transmission); + if(EV_ERROR & revents) { g_message("on_client_writable() got error event, closing peer"); + ebb_client_close(client); return; } //if(client->written != 0) // g_debug("total written: %d", (int)(client->written)); @@ -516,70 +577,87 @@ #ifdef DEBUG g_message("Error writing: %s", strerror(errno)); #endif ebb_client_close(client); return; + } else if(sent == 0) { + g_message("Sent zero bytes? Closing connection"); + ebb_client_close(client); } client->written += sent; assert(client->written <= client->response_buffer->len); //g_message("wrote %d bytes. total: %d", (int)sent, (int)(client->written)); ev_timer_again(loop, &(client->timeout_watcher)); - if(client->written == client->response_buffer->len) - ebb_client_close(client); + if(client->written == client->response_buffer->len) { + ev_io_stop(loop, watcher); + if(client->body_written) + ebb_client_close(client); + } } void ebb_client_write_status(ebb_client *client, int status, const char *human_status) { - assert(client->status_sent == FALSE); + assert(client->in_use); + if(!client->open) return; + assert(client->status_written == FALSE); g_string_append_printf( client->response_buffer , "HTTP/1.1 %d %s\r\n" , status , human_status ); - client->status_sent = TRUE; + client->status_written = TRUE; } void ebb_client_write_header(ebb_client *client, const char *field, const char *value) { - assert(client->status_sent == TRUE); - assert(client->headers_sent == FALSE); + assert(client->in_use); + if(!client->open) return; + assert(client->status_written == TRUE); + assert(client->headers_written == FALSE); g_string_append_printf( client->response_buffer , "%s: %s\r\n" , field , value ); } void ebb_client_write(ebb_client *client, const char *data, int length) { + assert(client->in_use); + if(!client->open) return; g_string_append_len(client->response_buffer, data, length); + if(client->began_transmission) { + /* restart the watcher if we're streaming */ + ev_io_start(client->server->loop, &client->write_watcher); + } } -void ebb_client_finished(ebb_client *client) +void ebb_client_begin_transmission(ebb_client *client) { - assert(client->open); - assert(FALSE == ev_is_active(&(client->write_watcher))); + assert(client->in_use); + if(!client->open) return; + assert(FALSE == ev_is_active(&client->write_watcher)); - /* assure the socket is still in non-blocking mode - * in the ruby binding, for example, i change this flag - */ + /* assure the socket is still in non-blocking mode */ int flags = fcntl(client->fd, F_GETFL, 0); if(0 > fcntl(client->fd, F_SETFL, flags | O_NONBLOCK)) { perror("fcntl()"); ebb_client_close(client); return; } + client->headers_written = TRUE; + client->began_transmission = TRUE; client->written = 0; client->write_watcher.data = client; ev_init (&(client->write_watcher), on_client_writable); ev_io_set (&(client->write_watcher), client->fd, EV_WRITE | EV_ERROR); - ev_io_start(client->server->loop, &(client->write_watcher)); + ev_io_start(client->server->loop, &client->write_watcher); } /* pass an allocated buffer and the length to read. this function will try to * fill the buffer with that length of data read from the body of the request. @@ -587,11 +665,12 @@ */ int ebb_client_read(ebb_client *client, char *buffer, int length) { size_t read; - assert(client->open); + assert(client->in_use); + if(!client->open) return -1; assert(client_finished_parsing); if(client->upload_file) { read = fread(buffer, 1, length, client->upload_file); /* TODO error checking! */ @@ -608,56 +687,9 @@ return read; } } /* The following socket creation routines are modified and stolen from memcached */ - -static int server_socket(const int port) { - int sfd; - struct linger ling = {0, 0}; - struct sockaddr_in addr; - int flags =1; - - if ((sfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) { - perror("socket()"); - return -1; - } - - if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 || - fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) { - perror("setting O_NONBLOCK"); - close(sfd); - return -1; - } - - flags = 1; - setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags)); - setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags)); - setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling)); - setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags)); - - /* - * the memset call clears nonstandard fields in some impementations - * that otherwise mess things up. - */ - memset(&addr, 0, sizeof(addr)); - - addr.sin_family = AF_INET; - addr.sin_port = htons(port); - addr.sin_addr.s_addr = htonl(INADDR_ANY); - - if (bind(sfd, (struct sockaddr *)&addr, sizeof(addr)) == -1) { - perror("bind()"); - close(sfd); - return -1; - } - if (listen(sfd, EBB_MAX_CLIENTS) == -1) { - perror("listen()"); - close(sfd); - return -1; - } - return sfd; -} static int server_socket_unix(const char *path, int access_mask) { int sfd; struct linger ling = {0, 0}; \ No newline at end of file