ext/posix/mqueue.c in posix-mqueue-0.0.3 vs ext/posix/mqueue.c in posix-mqueue-0.0.4
- old
+ new
@@ -2,14 +2,18 @@
#include <ruby/util.h>
#include <mqueue.h>
#include <fcntl.h>
#include <errno.h>
+#include <time.h>
#include <stdlib.h>
#include <stdio.h>
+VALUE rb_cQueueFull = Qnil;
+VALUE rb_cQueueEmpty = Qnil;
+
typedef struct {
mqd_t fd;
struct mq_attr attr;
size_t queue_len;
char *queue;
@@ -66,11 +70,11 @@
mqueue_t* data;
TypedData_Get_Struct(self, mqueue_t, &mqueue_type, data);
if (mq_unlink(data->queue) == -1) {
- rb_sys_fail("Message queue unlinking failed");
+ rb_sys_fail("Message queue unlinking failed, please consume mq_unlink(3)");
}
return Qtrue;
}
@@ -87,16 +91,98 @@
// TODO: Custom priority
err = mq_send(data->fd, RSTRING_PTR(message), RSTRING_LEN(message), 10);
if (err < 0) {
- rb_sys_fail("Message sending failed");
+ rb_sys_fail("Message sending failed, please consult mq_send(3)");
}
return Qtrue;
}
+VALUE posix_mqueue_timedreceive(VALUE self, VALUE seconds, VALUE nanoseconds)
+{
+ int err;
+ mqueue_t* data;
+ size_t buf_size;
+ char *buf;
+ struct timespec timeout;
+ VALUE str;
+
+ TypedData_Get_Struct(self, mqueue_t, &mqueue_type, data);
+
+ if (!RB_TYPE_P(seconds, T_FIXNUM)) {
+ rb_raise(rb_eTypeError, "First argument must be a Fixnum");
+ }
+
+ if (!RB_TYPE_P(nanoseconds, T_FIXNUM)) {
+ rb_raise(rb_eTypeError, "First argument must be a fixnum");
+ }
+
+ timeout.tv_sec = FIX2ULONG(seconds);
+ timeout.tv_nsec = FIX2ULONG(nanoseconds);
+
+ buf_size = data->attr.mq_msgsize + 1;
+
+ // Make sure the buffer is capable
+ buf = (char*)malloc(buf_size);
+
+ // TODO: Specify priority
+ err = mq_timedreceive(data->fd, buf, buf_size, NULL, &timeout);
+
+ if (err < 0) {
+ if(errno == 110) {
+ rb_raise(rb_cQueueEmpty, "Queue empty");
+ } else {
+ rb_sys_fail("Message sending failed, please consult mq_send(3)");
+ }
+ }
+
+ str = rb_str_new(buf, err);
+ free(buf);
+
+ return str;
+
+ return Qtrue;
+}
+
+VALUE posix_mqueue_timedsend(VALUE self, VALUE seconds, VALUE nanoseconds, VALUE message)
+{
+ int err;
+ mqueue_t* data;
+ struct timespec timeout;
+
+ TypedData_Get_Struct(self, mqueue_t, &mqueue_type, data);
+
+ if (!RB_TYPE_P(seconds, T_FIXNUM)) {
+ rb_raise(rb_eTypeError, "First argument must be a Fixnum");
+ }
+
+ if (!RB_TYPE_P(nanoseconds, T_FIXNUM)) {
+ rb_raise(rb_eTypeError, "First argument must be a fixnum");
+ }
+
+ if (!RB_TYPE_P(message, T_STRING)) {
+ rb_raise(rb_eTypeError, "Message must be a string");
+ }
+
+ timeout.tv_sec = FIX2ULONG(seconds);
+ timeout.tv_nsec = FIX2ULONG(nanoseconds);
+
+ err = mq_timedsend(data->fd, RSTRING_PTR(message), RSTRING_LEN(message), 10, &timeout);
+
+ if (err < 0) {
+ if(errno == 110) {
+ rb_raise(rb_cQueueFull, "Queue full, most likely you wanna bump /proc/sys/fs/mqueue/msg_max from the default maximum queue size of 10.");
+ } else {
+ rb_sys_fail("Message sending failed, please consult mq_send(3)");
+ }
+ }
+
+ return Qtrue;
+}
+
VALUE posix_mqueue_receive(VALUE self)
{
int err;
size_t buf_size;
char *buf;
@@ -113,11 +199,11 @@
// TODO: Specify priority
err = mq_receive(data->fd, buf, buf_size, NULL);
if (err < 0) {
- rb_sys_fail("Message retrieval failed");
+ rb_sys_fail("Message retrieval failed, please consult mq_receive(3)");
}
str = rb_str_new(buf, err);
free(buf);
@@ -144,25 +230,30 @@
}
data->attr = attr;
data->queue_len = RSTRING_LEN(queue);
data->queue = ruby_strdup(StringValueCStr(queue));
- data->fd = mq_open(data->queue, O_CREAT | O_RDWR, S_IRWXU | S_IRWXO | S_IRWXG, &data->attr);
+ data->fd = mq_open(data->queue, O_CREAT | O_RDWR, S_IRUSR | S_IWUSR, &data->attr);
if (data->fd == (mqd_t)-1) {
- rb_sys_fail("Failed opening the message queue");
+ rb_sys_fail("Failed opening the message queue, please consult mq_open(3)");
}
return self;
}
void Init_mqueue()
{
VALUE posix = rb_define_module("POSIX");
VALUE mqueue = rb_define_class_under(posix, "Mqueue", rb_cObject);
+ rb_cQueueFull = rb_define_class_under(mqueue, "QueueFull", rb_eStandardError);
+ rb_cQueueEmpty = rb_define_class_under(mqueue, "QueueEmpty", rb_eStandardError);
+
rb_define_alloc_func(mqueue, posix_mqueue_alloc);
rb_define_method(mqueue, "initialize", posix_mqueue_initialize, 1);
rb_define_method(mqueue, "send", posix_mqueue_send, 1);
rb_define_method(mqueue, "receive", posix_mqueue_receive, 0);
+ rb_define_method(mqueue, "timedsend", posix_mqueue_timedsend, 3);
+ rb_define_method(mqueue, "timedreceive", posix_mqueue_timedreceive, 2);
rb_define_method(mqueue, "unlink", posix_mqueue_unlink, 0);
}