ext/buffer/em_buffer.c in astro-em-http-request-0.2.8 vs ext/buffer/em_buffer.c in astro-em-http-request-0.2.9

- old
+ new

@@ -27,19 +27,19 @@ /* How often to scan the pool for old nodes */ #define PURGE_INTERVAL 10 struct buffer { - time_t last_purged_at; + time_t last_purged_at; unsigned size, node_size; struct buffer_node *head, *tail; struct buffer_node *pool_head, *pool_tail; - + }; struct buffer_node { - time_t last_used_at; + time_t last_used_at; unsigned start, end; struct buffer_node *next; unsigned char data[0]; }; @@ -70,11 +70,11 @@ static void buffer_read(struct buffer *buf, char *str, unsigned len); static void buffer_copy(struct buffer *buf, char *str, unsigned len); static int buffer_read_from(struct buffer *buf, int fd); static int buffer_write_to(struct buffer *buf, int fd); -/* +/* * High speed buffering geared towards non-blocking I/O. * * Data is stored in a byte queue implemented as a linked list of equal size * chunks. Since every node in the list is the same size they are easily * memory pooled. Routines are provided for high speed non-blocking reads @@ -92,12 +92,12 @@ rb_define_method(cEm_Buffer, "empty?", Em_Buffer_empty, 0); rb_define_method(cEm_Buffer, "<<", Em_Buffer_append, 1); rb_define_method(cEm_Buffer, "append", Em_Buffer_append, 1); rb_define_method(cEm_Buffer, "prepend", Em_Buffer_prepend, 1); rb_define_method(cEm_Buffer, "read", Em_Buffer_read, -1); - rb_define_method(cEm_Buffer, "to_str", Em_Buffer_to_str, 0); - rb_define_method(cEm_Buffer, "read_from", Em_Buffer_read_from, 1); + rb_define_method(cEm_Buffer, "to_str", Em_Buffer_to_str, 0); + rb_define_method(cEm_Buffer, "read_from", Em_Buffer_read_from, 1); rb_define_method(cEm_Buffer, "write_to", Em_Buffer_write_to, 1); } static VALUE Em_Buffer_allocate(VALUE klass) { @@ -116,11 +116,11 @@ } /** * call-seq: * EventMachine::Buffer.new(size = DEFAULT_NODE_SIZE) -> EventMachine::Buffer - * + * * Create a new EventMachine::Buffer with linked segments of the given size */ static VALUE Em_Buffer_initialize(int argc, VALUE *argv, VALUE self) { VALUE node_size_obj; @@ -145,11 +145,11 @@ } /** * call-seq: * EventMachine::Buffer#clear -> nil - * + * * Clear all data from the EventMachine::Buffer */ static VALUE Em_Buffer_clear(VALUE self) { struct buffer *buf; @@ -161,39 +161,39 @@ } /** * call-seq: * EventMachine::Buffer#size -> Integer - * + * * Return the size of the buffer in bytes */ -static VALUE Em_Buffer_size(VALUE self) +static VALUE Em_Buffer_size(VALUE self) { struct buffer *buf; Data_Get_Struct(self, struct buffer, buf); return INT2NUM(buf->size); } /** * call-seq: * EventMachine::Buffer#empty? -> Boolean - * + * * Is the buffer empty? */ -static VALUE Em_Buffer_empty(VALUE self) +static VALUE Em_Buffer_empty(VALUE self) { struct buffer *buf; Data_Get_Struct(self, struct buffer, buf); - return buf->size > 0 ? Qfalse : Qtrue; + return buf->size > 0 ? Qfalse : Qtrue; } /** * call-seq: * EventMachine::Buffer#append(data) -> String - * + * * Append the given data to the end of the buffer */ static VALUE Em_Buffer_append(VALUE self, VALUE data) { struct buffer *buf; @@ -207,11 +207,11 @@ } /** * call-seq: * EventMachine::Buffer#prepend(data) -> String - * + * * Prepend the given data to the beginning of the buffer */ static VALUE Em_Buffer_prepend(VALUE self, VALUE data) { struct buffer *buf; @@ -224,11 +224,11 @@ } /** * call-seq: * EventMachine::Buffer#read(length = nil) -> String - * + * * Read the specified abount of data from the buffer. If no value * is given the entire contents of the buffer are returned. Any data * read from the buffer is cleared. */ static VALUE Em_Buffer_read(int argc, VALUE *argv, VALUE self) @@ -261,73 +261,82 @@ } /** * call-seq: * EventMachine::Buffer#to_str -> String - * + * * Convert the Buffer to a String. The original buffer is unmodified. */ static VALUE Em_Buffer_to_str(VALUE self) { - VALUE str; - struct buffer *buf; - - Data_Get_Struct(self, struct buffer, buf); - - str = rb_str_new(0, buf->size); - buffer_copy(buf, RSTRING_PTR(str), buf->size); - + VALUE str; + struct buffer *buf; + + Data_Get_Struct(self, struct buffer, buf); + + str = rb_str_new(0, buf->size); + buffer_copy(buf, RSTRING_PTR(str), buf->size); + return str; } /** * call-seq: * EventMachine::Buffer#read_from(io) -> Integer - * + * * Perform a nonblocking read of the the given IO object and fill * the buffer with any data received. The call will read as much * data as it can until the read would block. */ static VALUE Em_Buffer_read_from(VALUE self, VALUE io) { - struct buffer *buf; + struct buffer *buf; #if HAVE_RB_IO_T rb_io_t *fptr; #else OpenFile *fptr; #endif Data_Get_Struct(self, struct buffer, buf); GetOpenFile(rb_convert_type(io, T_FILE, "IO", "to_io"), fptr); rb_io_set_nonblock(fptr); +#ifdef HAVE_RB_IO_FD + return INT2NUM(buffer_read_from(buf, rb_io_fd(io))); +#else return INT2NUM(buffer_read_from(buf, FPTR_TO_FD(fptr))); +#endif } /** * call-seq: * EventMachine::Buffer#write_to(io) -> Integer - * + * * Perform a nonblocking write of the buffer to the given IO object. * As much data as possible is written until the call would block. * Any data which is written is removed from the buffer. */ static VALUE Em_Buffer_write_to(VALUE self, VALUE io) { struct buffer *buf; -#if HAVE_RB_IO_T +#if HAVE_RB_IO_T rb_io_t *fptr; #else OpenFile *fptr; #endif Data_Get_Struct(self, struct buffer, buf); GetOpenFile(rb_convert_type(io, T_FILE, "IO", "to_io"), fptr); rb_io_set_nonblock(fptr); - return INT2NUM(buffer_write_to(buf, FPTR_TO_FD(fptr))); +#ifdef HAVE_RB_IO_FD + return INT2NUM(buffer_read_from(buf, rb_io_fd(io))); +#else + return INT2NUM(buffer_read_from(buf, FPTR_TO_FD(fptr))); +#endif + } /* - * Ruby bindings end here. Below is the actual implementation of + * Ruby bindings end here. Below is the actual implementation of * the underlying data structures. */ /* Create a new buffer */ static struct buffer *buffer_new(void) @@ -336,12 +345,12 @@ buf = (struct buffer *)xmalloc(sizeof(struct buffer)); buf->head = buf->tail = buf->pool_head = buf->pool_tail = 0; buf->size = 0; buf->node_size = DEFAULT_NODE_SIZE; - time(&buf->last_purged_at); - + time(&buf->last_purged_at); + return buf; } /* Clear all data from a buffer */ static void buffer_clear(struct buffer *buf) @@ -357,11 +366,11 @@ buf->head = buf->tail = 0; buf->size = 0; } /* Free a buffer */ -static void buffer_free(struct buffer *buf) +static void buffer_free(struct buffer *buf) { struct buffer_node *tmp; buffer_clear(buf); @@ -379,24 +388,24 @@ { struct buffer_node *cur, *tmp; time_t now; time(&now); - /* Only purge if we've passed the purge interval */ - if(now - buf->last_purged_at < PURGE_INTERVAL) - return; - - buf->last_purged_at = now; + /* Only purge if we've passed the purge interval */ + if(now - buf->last_purged_at < PURGE_INTERVAL) + return; + buf->last_purged_at = now; + while(buf->pool_head && now - buf->pool_head->last_used_at >= MAX_AGE) { tmp = buf->pool_head; buf->pool_head = buf->pool_head->next; free(tmp); } if(!buf->pool_head) - buf->pool_tail = 0; + buf->pool_tail = 0; } /* Create a new buffer_node (or pull one from the memory pool) */ static struct buffer_node *buffer_node_new(struct buffer *buf) { @@ -492,15 +501,15 @@ /* Build links out of the data */ while(len > 0) { nbytes = buf->node_size - buf->tail->end; if(len < nbytes) nbytes = len; - + memcpy(buf->tail->data + buf->tail->end, str, nbytes); - str += nbytes; + str += nbytes; len -= nbytes; - + buf->tail->end += nbytes; if(len > 0) { buf->tail->next = buffer_node_new(buf); buf->tail = buf->tail->next; @@ -539,21 +548,21 @@ static void buffer_copy(struct buffer *buf, char *str, unsigned len) { unsigned nbytes; struct buffer_node *node; - node = buf->head; + node = buf->head; while(node && len > 0) { nbytes = node->end - node->start; if(len < nbytes) nbytes = len; memcpy(str, node->data + node->start, nbytes); str += nbytes; len -= nbytes; if(node->start + nbytes == node->end) - node = node->next; + node = node->next; } } /* Write data from the buffer to a file descriptor */ static int buffer_write_to(struct buffer *buf, int fd) @@ -594,37 +603,37 @@ /* Read data from a file descriptor to a buffer */ /* Append data to the front of the buffer */ static int buffer_read_from(struct buffer *buf, int fd) { - int bytes_read, total_bytes_read = 0; + int bytes_read, total_bytes_read = 0; unsigned nbytes; /* Empty list needs initialized */ if(!buf->head) { buf->head = buffer_node_new(buf); buf->tail = buf->head; } - do { - nbytes = buf->node_size - buf->tail->end; - bytes_read = read(fd, buf->tail->data + buf->tail->end, nbytes); - - if(bytes_read < 1) { - if(errno != EAGAIN) + do { + nbytes = buf->node_size - buf->tail->end; + bytes_read = read(fd, buf->tail->data + buf->tail->end, nbytes); + + if(bytes_read < 1) { + if(errno != EAGAIN) rb_sys_fail("read"); - - return total_bytes_read; - } - - total_bytes_read += bytes_read; - buf->tail->end += nbytes; - buf->size += nbytes; - - if(buf->tail->end == buf->node_size) { + + return total_bytes_read; + } + + total_bytes_read += bytes_read; + buf->tail->end += nbytes; + buf->size += nbytes; + + if(buf->tail->end == buf->node_size) { buf->tail->next = buffer_node_new(buf); buf->tail = buf->tail->next; - } - } while(bytes_read == nbytes); - - return total_bytes_read; + } + } while(bytes_read == nbytes); + + return total_bytes_read; }