ext/posix/mqueue.c in posix-mqueue-0.0.3 vs ext/posix/mqueue.c in posix-mqueue-0.0.4

- old
+ new

@@ -2,14 +2,18 @@ #include <ruby/util.h> #include <mqueue.h> #include <fcntl.h> #include <errno.h> +#include <time.h> #include <stdlib.h> #include <stdio.h> +VALUE rb_cQueueFull = Qnil; +VALUE rb_cQueueEmpty = Qnil; + typedef struct { mqd_t fd; struct mq_attr attr; size_t queue_len; char *queue; @@ -66,11 +70,11 @@ mqueue_t* data; TypedData_Get_Struct(self, mqueue_t, &mqueue_type, data); if (mq_unlink(data->queue) == -1) { - rb_sys_fail("Message queue unlinking failed"); + rb_sys_fail("Message queue unlinking failed, please consume mq_unlink(3)"); } return Qtrue; } @@ -87,16 +91,98 @@ // TODO: Custom priority err = mq_send(data->fd, RSTRING_PTR(message), RSTRING_LEN(message), 10); if (err < 0) { - rb_sys_fail("Message sending failed"); + rb_sys_fail("Message sending failed, please consult mq_send(3)"); } return Qtrue; } +VALUE posix_mqueue_timedreceive(VALUE self, VALUE seconds, VALUE nanoseconds) +{ + int err; + mqueue_t* data; + size_t buf_size; + char *buf; + struct timespec timeout; + VALUE str; + + TypedData_Get_Struct(self, mqueue_t, &mqueue_type, data); + + if (!RB_TYPE_P(seconds, T_FIXNUM)) { + rb_raise(rb_eTypeError, "First argument must be a Fixnum"); + } + + if (!RB_TYPE_P(nanoseconds, T_FIXNUM)) { + rb_raise(rb_eTypeError, "First argument must be a fixnum"); + } + + timeout.tv_sec = FIX2ULONG(seconds); + timeout.tv_nsec = FIX2ULONG(nanoseconds); + + buf_size = data->attr.mq_msgsize + 1; + + // Make sure the buffer is capable + buf = (char*)malloc(buf_size); + + // TODO: Specify priority + err = mq_timedreceive(data->fd, buf, buf_size, NULL, &timeout); + + if (err < 0) { + if(errno == 110) { + rb_raise(rb_cQueueEmpty, "Queue empty"); + } else { + rb_sys_fail("Message sending failed, please consult mq_send(3)"); + } + } + + str = rb_str_new(buf, err); + free(buf); + + return str; + + return Qtrue; +} + +VALUE posix_mqueue_timedsend(VALUE self, VALUE seconds, VALUE nanoseconds, VALUE message) +{ + int err; + mqueue_t* data; + struct timespec timeout; + + TypedData_Get_Struct(self, mqueue_t, &mqueue_type, data); + + if (!RB_TYPE_P(seconds, T_FIXNUM)) { + rb_raise(rb_eTypeError, "First argument must be a Fixnum"); + } + + if (!RB_TYPE_P(nanoseconds, T_FIXNUM)) { + rb_raise(rb_eTypeError, "First argument must be a fixnum"); + } + + if (!RB_TYPE_P(message, T_STRING)) { + rb_raise(rb_eTypeError, "Message must be a string"); + } + + timeout.tv_sec = FIX2ULONG(seconds); + timeout.tv_nsec = FIX2ULONG(nanoseconds); + + err = mq_timedsend(data->fd, RSTRING_PTR(message), RSTRING_LEN(message), 10, &timeout); + + if (err < 0) { + if(errno == 110) { + rb_raise(rb_cQueueFull, "Queue full, most likely you wanna bump /proc/sys/fs/mqueue/msg_max from the default maximum queue size of 10."); + } else { + rb_sys_fail("Message sending failed, please consult mq_send(3)"); + } + } + + return Qtrue; +} + VALUE posix_mqueue_receive(VALUE self) { int err; size_t buf_size; char *buf; @@ -113,11 +199,11 @@ // TODO: Specify priority err = mq_receive(data->fd, buf, buf_size, NULL); if (err < 0) { - rb_sys_fail("Message retrieval failed"); + rb_sys_fail("Message retrieval failed, please consult mq_receive(3)"); } str = rb_str_new(buf, err); free(buf); @@ -144,25 +230,30 @@ } data->attr = attr; data->queue_len = RSTRING_LEN(queue); data->queue = ruby_strdup(StringValueCStr(queue)); - data->fd = mq_open(data->queue, O_CREAT | O_RDWR, S_IRWXU | S_IRWXO | S_IRWXG, &data->attr); + data->fd = mq_open(data->queue, O_CREAT | O_RDWR, S_IRUSR | S_IWUSR, &data->attr); if (data->fd == (mqd_t)-1) { - rb_sys_fail("Failed opening the message queue"); + rb_sys_fail("Failed opening the message queue, please consult mq_open(3)"); } return self; } void Init_mqueue() { VALUE posix = rb_define_module("POSIX"); VALUE mqueue = rb_define_class_under(posix, "Mqueue", rb_cObject); + rb_cQueueFull = rb_define_class_under(mqueue, "QueueFull", rb_eStandardError); + rb_cQueueEmpty = rb_define_class_under(mqueue, "QueueEmpty", rb_eStandardError); + rb_define_alloc_func(mqueue, posix_mqueue_alloc); rb_define_method(mqueue, "initialize", posix_mqueue_initialize, 1); rb_define_method(mqueue, "send", posix_mqueue_send, 1); rb_define_method(mqueue, "receive", posix_mqueue_receive, 0); + rb_define_method(mqueue, "timedsend", posix_mqueue_timedsend, 3); + rb_define_method(mqueue, "timedreceive", posix_mqueue_timedreceive, 2); rb_define_method(mqueue, "unlink", posix_mqueue_unlink, 0); }