src/ebb.c in ebb-0.0.4 vs src/ebb.c in ebb-0.1.0
- old
+ new
@@ -27,11 +27,10 @@
#include "ebb.h"
#define min(a,b) (a < b ? a : b)
#define ramp(a) (a > 0 ? a : 0)
-static int server_socket(const int port);
static int server_socket_unix(const char *path, int access_mask);
void env_add(ebb_client *client, const char *field, int flen, const char *value, int vlen)
{
if(client->env_size >= EBB_MAX_ENV) {
@@ -59,10 +58,11 @@
client->env[client->env_size].value = value;
client->env[client->env_size].value_length = vlen;
client->env_size += 1;
}
+
void http_field_cb(void *data, const char *field, size_t flen, const char *value, size_t vlen)
{
ebb_client *client = (ebb_client*)(data);
assert(field != NULL);
assert(value != NULL);
@@ -114,43 +114,38 @@
void content_length_cb(void *data, const char *at, size_t length)
{
ebb_client *client = (ebb_client*)(data);
env_add_const(client, EBB_CONTENT_LENGTH, at, length);
-
- /* i hate c. */
- char buf[20];
- strncpy(buf, at, length);
- buf[length] = '\0';
- client->content_length = atoi(buf);
+ /* atoi_length - why isn't this in the statndard library? i hate c */
+ assert(client->content_length == 0);
+ int i, mult;
+ for(mult=1, i=length-1; i>=0; i--, mult*=10)
+ client->content_length += (at[i] - '0') * mult;
}
-const char* localhost_str = "0.0.0.0";
-void dispatch(ebb_client *client)
+static void dispatch(ebb_client *client)
{
ebb_server *server = client->server;
if(client->open == FALSE)
return;
/* Set the env variables */
if(server->port) {
- env_add_const(client, EBB_SERVER_NAME
- , localhost_str
- , 7
- );
env_add_const(client, EBB_SERVER_PORT
, server->port
, strlen(server->port)
);
}
+ client->in_use = TRUE;
server->request_cb(client, server->request_cb_data);
}
-void on_timeout(struct ev_loop *loop, ev_timer *watcher, int revents)
+static void on_timeout(struct ev_loop *loop, ev_timer *watcher, int revents)
{
ebb_client *client = (ebb_client*)(watcher->data);
assert(client->server->loop == loop);
assert(&(client->timeout_watcher) == watcher);
@@ -162,11 +157,11 @@
}
#define client_finished_parsing http_parser_is_finished(&client->parser)
#define total_request_size (client->content_length + client->parser.nread)
-void* read_body_into_file(void *_client)
+static void* read_body_into_file(void *_client)
{
ebb_client *client = (ebb_client*)_client;
static unsigned int id;
FILE *tmpfile;
@@ -197,12 +192,10 @@
return NULL;
}
written += r;
}
- // g_debug("wrote request header to file. written: %d, content_length: %d", written, client->content_length);
-
int bufsize = 5*1024;
char buffer[bufsize];
size_t received;
while(written < client->content_length) {
received = recv(client->fd
@@ -234,26 +227,27 @@
ebb_client_close(client);
return NULL;
}
-void on_readable(struct ev_loop *loop, ev_io *watcher, int revents)
+static void on_client_readable(struct ev_loop *loop, ev_io *watcher, int revents)
{
ebb_client *client = (ebb_client*)(watcher->data);
+ assert(client->in_use == FALSE);
assert(client->open);
assert(client->server->open);
assert(client->server->loop == loop);
assert(&client->read_watcher == watcher);
ssize_t read = recv( client->fd
, client->request_buffer + client->read
, EBB_BUFFERSIZE - client->read
, 0
);
- if(read < 0) goto error; /* XXX is this the right action to take for read==0 ? */
- if(read == 0) return;
+ if(read < 0) goto error;
+ if(read == 0) goto error; /* XXX is this the right action to take for read==0 ? */
client->read += read;
ev_timer_again(loop, &client->timeout_watcher);
if(client->read == EBB_BUFFERSIZE) goto error;
@@ -286,46 +280,14 @@
error:
if(read < 0) g_message("Error recving data: %s", strerror(errno));
ebb_client_close(client);
}
-void on_request(struct ev_loop *loop, ev_io *watcher, int revents)
+static client_init(ebb_server *server, ebb_client *client)
{
- ebb_server *server = (ebb_server*)(watcher->data);
- assert(server->open);
- assert(server->loop == loop);
- assert(&server->request_watcher == watcher);
-
- if(EV_ERROR & revents) {
- g_message("on_request() got error event, closing server.");
- ebb_server_unlisten(server);
- return;
- }
- /* Now we're going to initialize the client
- * and set up her callbacks for read and write
- * the client won't get passed back to the user, however,
- * until the request is complete and parsed.
- */
- int i;
- ebb_client *client;
- /* Get next availible peer */
- for(i=0; i < EBB_MAX_CLIENTS; i++)
- if(!server->clients[i].open) {
- client = &(server->clients[i]);
- break;
- }
- if(client == NULL) {
- g_message("Too many peers. Refusing connections.");
- return;
- }
-
+ assert(client->in_use == FALSE);
#ifdef DEBUG
- int count = 0;
- for(i = 0; i < EBB_MAX_CLIENTS; i++)
- if(server->clients[i].open) count += 1;
- g_debug("%d open connections", count);
-
/* does ragel fuck up if request buffer isn't null? */
for(i=0; i< EBB_BUFFERSIZE; i++)
client->request_buffer[i] = 'A';
#endif
@@ -333,11 +295,16 @@
client->server = server;
/* DO SOCKET STUFF */
socklen_t len;
client->fd = accept(server->fd, (struct sockaddr*)&(server->sockaddr), &len);
- assert(client->fd >= 0);
+ if(client->fd < 0) {
+ perror("accept()");
+ client->open = FALSE;
+ return;
+ }
+
int flags = fcntl(client->fd, F_GETFL, 0);
assert(0 <= fcntl(client->fd, F_SETFL, flags | O_NONBLOCK));
/* INITIALIZE http_parser */
http_parser_init(&(client->parser));
@@ -350,32 +317,75 @@
client->parser.query_string = query_string_cb;
client->parser.http_version = http_version_cb;
client->parser.content_length = content_length_cb;
/* OTHER */
-
client->env_size = 0;
client->read = client->nread_from_body = 0;
client->response_buffer->len = 0; /* see note in ebb_client_close */
client->content_length = 0;
+ if(client->request_buffer == NULL) {
+ client->request_buffer = (char*)malloc(EBB_BUFFERSIZE);
+ }
- client->status_sent = FALSE;
- client->headers_sent = FALSE;
- client->body_sent = FALSE;
+ client->status_written = FALSE;
+ client->headers_written = FALSE;
+ client->body_written = FALSE;
+ client->began_transmission = FALSE;
/* SETUP READ AND TIMEOUT WATCHERS */
client->read_watcher.data = client;
- ev_init(&client->read_watcher, on_readable);
+ ev_init(&client->read_watcher, on_client_readable);
ev_io_set(&client->read_watcher, client->fd, EV_READ | EV_ERROR);
ev_io_start(server->loop, &client->read_watcher);
client->timeout_watcher.data = client;
ev_timer_init(&client->timeout_watcher, on_timeout, EBB_TIMEOUT, EBB_TIMEOUT);
ev_timer_start(server->loop, &client->timeout_watcher);
}
+static void on_request(struct ev_loop *loop, ev_io *watcher, int revents)
+{
+ ebb_server *server = (ebb_server*)(watcher->data);
+ assert(server->open);
+ assert(server->loop == loop);
+ assert(&server->request_watcher == watcher);
+
+ if(EV_ERROR & revents) {
+ g_message("on_request() got error event, closing server.");
+ ebb_server_unlisten(server);
+ return;
+ }
+ /* Now we're going to initialize the client
+ * and set up her callbacks for read and write
+ * the client won't get passed back to the user, however,
+ * until the request is complete and parsed.
+ */
+ int i;
+ ebb_client *client;
+ /* Get next availible peer */
+ for(i=0; i < EBB_MAX_CLIENTS; i++)
+ if(!server->clients[i].in_use && !server->clients[i].open) {
+ client = &(server->clients[i]);
+ break;
+ }
+ if(client == NULL) {
+ g_message("Too many peers. Refusing connections.");
+ return;
+ }
+
+#ifdef DEBUG
+ int count = 0;
+ for(i = 0; i < EBB_MAX_CLIENTS; i++)
+ if(server->clients[i].open) count += 1;
+ g_debug("%d open connections", count);
+#endif
+
+ client_init(server, client);
+}
+
ebb_server* ebb_server_alloc()
{
ebb_server *server = g_new0(ebb_server, 1);
return server;
}
@@ -386,12 +396,16 @@
, ebb_request_cb request_cb
, void *request_cb_data
)
{
int i;
- for(i=0; i < EBB_MAX_CLIENTS; i++)
+ for(i=0; i < EBB_MAX_CLIENTS; i++) {
+ server->clients[i].request_buffer = NULL;
server->clients[i].response_buffer = g_string_new("");
+ server->clients[i].open = FALSE;
+ server->clients[i].in_use = FALSE;
+ }
server->request_cb = request_cb;
server->request_cb_data = request_cb_data;
server->loop = loop;
server->open = FALSE;
@@ -429,45 +443,87 @@
unlink(server->socketpath);
server->open = FALSE;
}
}
-
-void ebb_server_listen(ebb_server *server)
+int ebb_server_listen_on_port(ebb_server *server, const int port)
{
- int r = listen(server->fd, EBB_MAX_CLIENTS);
- assert(r >= 0);
+ int sfd = -1;
+ struct linger ling = {0, 0};
+ struct sockaddr_in addr;
+ int flags = 1;
+
+ if ((sfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
+ perror("socket()");
+ goto error;
+ }
+
+ flags = fcntl(sfd, F_GETFL, 0);
+ if(fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
+ perror("setting O_NONBLOCK");
+ goto error;
+ }
+
+ flags = 1;
+ setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
+ setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
+ setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
+ setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags));
+
+ /*
+ * the memset call clears nonstandard fields in some impementations
+ * that otherwise mess things up.
+ */
+ memset(&addr, 0, sizeof(addr));
+
+ addr.sin_family = AF_INET;
+ addr.sin_port = htons(port);
+ addr.sin_addr.s_addr = htonl(INADDR_ANY);
+
+ if (bind(sfd, (struct sockaddr *)&addr, sizeof(addr)) < 0) {
+ perror("bind()");
+ goto error;
+ }
+ if (listen(sfd, EBB_MAX_CLIENTS) < 0) {
+ perror("listen()");
+ goto error;
+ }
+ server->fd = sfd;
+ server->port = malloc(sizeof(char)*8); /* for easy access to the port */
+ sprintf(server->port, "%d", port);
assert(server->open == FALSE);
server->open = TRUE;
server->request_watcher.data = server;
ev_init (&server->request_watcher, on_request);
ev_io_set (&server->request_watcher, server->fd, EV_READ | EV_ERROR);
ev_io_start (server->loop, &server->request_watcher);
+
+ return server->fd;
+error:
+ if(sfd > 0) close(sfd);
+ return -1;
}
-int ebb_server_listen_on_port(ebb_server *server, const int port)
+int ebb_server_listen_on_socket(ebb_server *server, const char *socketpath)
{
- int fd = server_socket(port);
- if(fd < 0) return 0;
- server->port = malloc(sizeof(char)*8); /* for easy access to the port */
- sprintf(server->port, "%d", port);
- server->fd = fd;
- ebb_server_listen(server);
- return fd;
+ // int fd = server_socket_unix(socketpath, 0755);
+ // if(fd < 0) return 0;
+ // server->socketpath = strdup(socketpath);
+ // server->fd = fd;
+ // server_listen(server);
+ // return fd;
}
-int ebb_server_listen_on_socket(ebb_server *server, const char *socketpath)
+void ebb_client_release(ebb_client *client)
{
- int fd = server_socket_unix(socketpath, 0755);
- if(fd < 0) return 0;
- server->socketpath = strdup(socketpath);
- server->fd = fd;
- ebb_server_listen(server);
- return fd;
+ assert(client->in_use);
+ client->in_use = FALSE;
+ if(client->written == client->response_buffer->len)
+ ebb_client_close(client);
}
void ebb_client_close(ebb_client *client)
{
@@ -492,17 +548,22 @@
client->open = FALSE;
}
}
-void on_client_writable(struct ev_loop *loop, ev_io *watcher, int revents)
+static void on_client_writable(struct ev_loop *loop, ev_io *watcher, int revents)
{
ebb_client *client = (ebb_client*)(watcher->data);
ssize_t sent;
+ assert(client->status_written);
+ assert(client->headers_written);
+ assert(client->began_transmission);
+
if(EV_ERROR & revents) {
g_message("on_client_writable() got error event, closing peer");
+ ebb_client_close(client);
return;
}
//if(client->written != 0)
// g_debug("total written: %d", (int)(client->written));
@@ -516,70 +577,87 @@
#ifdef DEBUG
g_message("Error writing: %s", strerror(errno));
#endif
ebb_client_close(client);
return;
+ } else if(sent == 0) {
+ g_message("Sent zero bytes? Closing connection");
+ ebb_client_close(client);
}
client->written += sent;
assert(client->written <= client->response_buffer->len);
//g_message("wrote %d bytes. total: %d", (int)sent, (int)(client->written));
ev_timer_again(loop, &(client->timeout_watcher));
- if(client->written == client->response_buffer->len)
- ebb_client_close(client);
+ if(client->written == client->response_buffer->len) {
+ ev_io_stop(loop, watcher);
+ if(client->body_written)
+ ebb_client_close(client);
+ }
}
void ebb_client_write_status(ebb_client *client, int status, const char *human_status)
{
- assert(client->status_sent == FALSE);
+ assert(client->in_use);
+ if(!client->open) return;
+ assert(client->status_written == FALSE);
g_string_append_printf( client->response_buffer
, "HTTP/1.1 %d %s\r\n"
, status
, human_status
);
- client->status_sent = TRUE;
+ client->status_written = TRUE;
}
void ebb_client_write_header(ebb_client *client, const char *field, const char *value)
{
- assert(client->status_sent == TRUE);
- assert(client->headers_sent == FALSE);
+ assert(client->in_use);
+ if(!client->open) return;
+ assert(client->status_written == TRUE);
+ assert(client->headers_written == FALSE);
g_string_append_printf( client->response_buffer
, "%s: %s\r\n"
, field
, value
);
}
void ebb_client_write(ebb_client *client, const char *data, int length)
{
+ assert(client->in_use);
+ if(!client->open) return;
g_string_append_len(client->response_buffer, data, length);
+ if(client->began_transmission) {
+ /* restart the watcher if we're streaming */
+ ev_io_start(client->server->loop, &client->write_watcher);
+ }
}
-void ebb_client_finished(ebb_client *client)
+void ebb_client_begin_transmission(ebb_client *client)
{
- assert(client->open);
- assert(FALSE == ev_is_active(&(client->write_watcher)));
+ assert(client->in_use);
+ if(!client->open) return;
+ assert(FALSE == ev_is_active(&client->write_watcher));
- /* assure the socket is still in non-blocking mode
- * in the ruby binding, for example, i change this flag
- */
+ /* assure the socket is still in non-blocking mode */
int flags = fcntl(client->fd, F_GETFL, 0);
if(0 > fcntl(client->fd, F_SETFL, flags | O_NONBLOCK)) {
perror("fcntl()");
ebb_client_close(client);
return;
}
+ client->headers_written = TRUE;
+ client->began_transmission = TRUE;
client->written = 0;
client->write_watcher.data = client;
ev_init (&(client->write_watcher), on_client_writable);
ev_io_set (&(client->write_watcher), client->fd, EV_WRITE | EV_ERROR);
- ev_io_start(client->server->loop, &(client->write_watcher));
+ ev_io_start(client->server->loop, &client->write_watcher);
}
/* pass an allocated buffer and the length to read. this function will try to
* fill the buffer with that length of data read from the body of the request.
@@ -587,11 +665,12 @@
*/
int ebb_client_read(ebb_client *client, char *buffer, int length)
{
size_t read;
- assert(client->open);
+ assert(client->in_use);
+ if(!client->open) return -1;
assert(client_finished_parsing);
if(client->upload_file) {
read = fread(buffer, 1, length, client->upload_file);
/* TODO error checking! */
@@ -608,56 +687,9 @@
return read;
}
}
/* The following socket creation routines are modified and stolen from memcached */
-
-static int server_socket(const int port) {
- int sfd;
- struct linger ling = {0, 0};
- struct sockaddr_in addr;
- int flags =1;
-
- if ((sfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
- perror("socket()");
- return -1;
- }
-
- if ((flags = fcntl(sfd, F_GETFL, 0)) < 0 ||
- fcntl(sfd, F_SETFL, flags | O_NONBLOCK) < 0) {
- perror("setting O_NONBLOCK");
- close(sfd);
- return -1;
- }
-
- flags = 1;
- setsockopt(sfd, SOL_SOCKET, SO_REUSEADDR, (void *)&flags, sizeof(flags));
- setsockopt(sfd, SOL_SOCKET, SO_KEEPALIVE, (void *)&flags, sizeof(flags));
- setsockopt(sfd, SOL_SOCKET, SO_LINGER, (void *)&ling, sizeof(ling));
- setsockopt(sfd, IPPROTO_TCP, TCP_NODELAY, (void *)&flags, sizeof(flags));
-
- /*
- * the memset call clears nonstandard fields in some impementations
- * that otherwise mess things up.
- */
- memset(&addr, 0, sizeof(addr));
-
- addr.sin_family = AF_INET;
- addr.sin_port = htons(port);
- addr.sin_addr.s_addr = htonl(INADDR_ANY);
-
- if (bind(sfd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
- perror("bind()");
- close(sfd);
- return -1;
- }
- if (listen(sfd, EBB_MAX_CLIENTS) == -1) {
- perror("listen()");
- close(sfd);
- return -1;
- }
- return sfd;
-}
static int server_socket_unix(const char *path, int access_mask) {
int sfd;
struct linger ling = {0, 0};
\ No newline at end of file