ext/posix/mqueue.c in posix-mqueue-0.0.4 vs ext/posix/mqueue.c in posix-mqueue-0.0.5

- old
+ new

@@ -1,7 +1,8 @@ #include <ruby.h> #include <ruby/util.h> +#include <ruby/io.h> #include <mqueue.h> #include <fcntl.h> #include <errno.h> #include <time.h> @@ -87,37 +88,48 @@ if (!RB_TYPE_P(message, T_STRING)) { rb_raise(rb_eTypeError, "Message must be a string"); } + rb_io_wait_writable(data->fd); + // TODO: Custom priority err = mq_send(data->fd, RSTRING_PTR(message), RSTRING_LEN(message), 10); + if(err < 0 && errno == EINTR) { + err = mq_send(data->fd, RSTRING_PTR(message), RSTRING_LEN(message), 10); + } + if (err < 0) { rb_sys_fail("Message sending failed, please consult mq_send(3)"); } return Qtrue; } -VALUE posix_mqueue_timedreceive(VALUE self, VALUE seconds, VALUE nanoseconds) +VALUE posix_mqueue_timedreceive(VALUE self, VALUE args) { int err; mqueue_t* data; size_t buf_size; char *buf; struct timespec timeout; VALUE str; + VALUE seconds = rb_ary_entry(args, 0); + VALUE nanoseconds = rb_ary_entry(args, 1); + if (seconds == Qnil) seconds = INT2FIX(0); + if (nanoseconds == Qnil) nanoseconds = INT2FIX(0); + TypedData_Get_Struct(self, mqueue_t, &mqueue_type, data); if (!RB_TYPE_P(seconds, T_FIXNUM)) { rb_raise(rb_eTypeError, "First argument must be a Fixnum"); } if (!RB_TYPE_P(nanoseconds, T_FIXNUM)) { - rb_raise(rb_eTypeError, "First argument must be a fixnum"); + rb_raise(rb_eTypeError, "Second argument must be a Fixnum"); } timeout.tv_sec = FIX2ULONG(seconds); timeout.tv_nsec = FIX2ULONG(nanoseconds); @@ -139,50 +151,82 @@ str = rb_str_new(buf, err); free(buf); return str; - - return Qtrue; } -VALUE posix_mqueue_timedsend(VALUE self, VALUE seconds, VALUE nanoseconds, VALUE message) +VALUE posix_mqueue_timedsend(VALUE self, VALUE args) { int err; mqueue_t* data; struct timespec timeout; + VALUE message = rb_ary_entry(args, 0); + VALUE seconds = rb_ary_entry(args, 1); + VALUE nanoseconds = rb_ary_entry(args, 2); + if (seconds == Qnil) seconds = INT2FIX(0); + if (nanoseconds == Qnil) nanoseconds = INT2FIX(0); + TypedData_Get_Struct(self, mqueue_t, &mqueue_type, data); - if (!RB_TYPE_P(seconds, T_FIXNUM)) { + if (!RB_TYPE_P(message, T_STRING)) { + rb_raise(rb_eTypeError, "Message must be a string"); + } + + if (!RB_TYPE_P(seconds, T_FIXNUM)) { rb_raise(rb_eTypeError, "First argument must be a Fixnum"); } if (!RB_TYPE_P(nanoseconds, T_FIXNUM)) { - rb_raise(rb_eTypeError, "First argument must be a fixnum"); + rb_raise(rb_eTypeError, "Second argument must be a Fixnum"); } - if (!RB_TYPE_P(message, T_STRING)) { - rb_raise(rb_eTypeError, "Message must be a string"); - } - timeout.tv_sec = FIX2ULONG(seconds); timeout.tv_nsec = FIX2ULONG(nanoseconds); err = mq_timedsend(data->fd, RSTRING_PTR(message), RSTRING_LEN(message), 10, &timeout); if (err < 0) { if(errno == 110) { - rb_raise(rb_cQueueFull, "Queue full, most likely you wanna bump /proc/sys/fs/mqueue/msg_max from the default maximum queue size of 10."); + rb_raise(rb_cQueueFull, "Queue full, most likely you want to bump /proc/sys/fs/mqueue/msg_max from the default maximum queue size of 10."); } else { rb_sys_fail("Message sending failed, please consult mq_send(3)"); } } return Qtrue; } +VALUE posix_mqueue_size(VALUE self) +{ + mqueue_t* data; + struct mq_attr queue; + + TypedData_Get_Struct(self, mqueue_t, &mqueue_type, data); + + if (mq_getattr(data->fd, &queue) < 0) { + rb_sys_fail("Failed reading queue attributes, please consult mq_getattr(3)"); + } + + return INT2FIX(queue.mq_curmsgs); +} + +VALUE posix_mqueue_msgsize(VALUE self) +{ + mqueue_t* data; + struct mq_attr queue; + + TypedData_Get_Struct(self, mqueue_t, &mqueue_type, data); + + if (mq_getattr(data->fd, &queue) < 0) { + rb_sys_fail("Failed reading queue attributes, please consult mq_getattr(3)"); + } + + return INT2FIX(queue.mq_msgsize); +} + VALUE posix_mqueue_receive(VALUE self) { int err; size_t buf_size; char *buf; @@ -195,31 +239,41 @@ buf_size = data->attr.mq_msgsize + 1; // Make sure the buffer is capable buf = (char*)malloc(buf_size); - // TODO: Specify priority + rb_thread_wait_fd(data->fd); + err = mq_receive(data->fd, buf, buf_size, NULL); + if(err < 0 && errno == EINTR) { + err = mq_receive(data->fd, buf, buf_size, NULL); + } + if (err < 0) { rb_sys_fail("Message retrieval failed, please consult mq_receive(3)"); } str = rb_str_new(buf, err); free(buf); return str; } -VALUE posix_mqueue_initialize(VALUE self, VALUE queue) +VALUE posix_mqueue_initialize(VALUE self, VALUE args) { - // TODO: Modify these options from initialize arguments - // TODO: Set nonblock and handle error in #push + VALUE options = rb_ary_entry(args, 1); + if (options == Qnil) options = rb_hash_new(); + + int msgsize = FIX2INT(rb_hash_lookup2(options, ID2SYM(rb_intern("msgsize")), INT2FIX(4096))); + int maxmsg = FIX2INT(rb_hash_lookup2(options, ID2SYM(rb_intern("maxmsg")), INT2FIX(10))); + VALUE queue = rb_ary_entry(args, 0); + struct mq_attr attr = { .mq_flags = 0, // Flags, 0 or O_NONBLOCK - .mq_maxmsg = 10, // Max messages in queue - .mq_msgsize = 4096, // Max message size (bytes) + .mq_maxmsg = maxmsg, // Max messages in queue + .mq_msgsize = msgsize, // Max message size (bytes) .mq_curmsgs = 0 // # currently in queue }; mqueue_t* data; TypedData_Get_Struct(self, mqueue_t, &mqueue_type, data); @@ -247,13 +301,15 @@ VALUE mqueue = rb_define_class_under(posix, "Mqueue", rb_cObject); rb_cQueueFull = rb_define_class_under(mqueue, "QueueFull", rb_eStandardError); rb_cQueueEmpty = rb_define_class_under(mqueue, "QueueEmpty", rb_eStandardError); rb_define_alloc_func(mqueue, posix_mqueue_alloc); - rb_define_method(mqueue, "initialize", posix_mqueue_initialize, 1); + rb_define_method(mqueue, "initialize", posix_mqueue_initialize, -2); rb_define_method(mqueue, "send", posix_mqueue_send, 1); rb_define_method(mqueue, "receive", posix_mqueue_receive, 0); - rb_define_method(mqueue, "timedsend", posix_mqueue_timedsend, 3); - rb_define_method(mqueue, "timedreceive", posix_mqueue_timedreceive, 2); + rb_define_method(mqueue, "timedsend", posix_mqueue_timedsend, -2); + rb_define_method(mqueue, "timedreceive", posix_mqueue_timedreceive, -2); + rb_define_method(mqueue, "msgsize", posix_mqueue_msgsize, 0); + rb_define_method(mqueue, "size", posix_mqueue_size, 0); rb_define_method(mqueue, "unlink", posix_mqueue_unlink, 0); }