ext/posix/mqueue.c in posix-mqueue-0.0.4 vs ext/posix/mqueue.c in posix-mqueue-0.0.5
- old
+ new
@@ -1,7 +1,8 @@
#include <ruby.h>
#include <ruby/util.h>
+#include <ruby/io.h>
#include <mqueue.h>
#include <fcntl.h>
#include <errno.h>
#include <time.h>
@@ -87,37 +88,48 @@
if (!RB_TYPE_P(message, T_STRING)) {
rb_raise(rb_eTypeError, "Message must be a string");
}
+ rb_io_wait_writable(data->fd);
+
// TODO: Custom priority
err = mq_send(data->fd, RSTRING_PTR(message), RSTRING_LEN(message), 10);
+ if(err < 0 && errno == EINTR) {
+ err = mq_send(data->fd, RSTRING_PTR(message), RSTRING_LEN(message), 10);
+ }
+
if (err < 0) {
rb_sys_fail("Message sending failed, please consult mq_send(3)");
}
return Qtrue;
}
-VALUE posix_mqueue_timedreceive(VALUE self, VALUE seconds, VALUE nanoseconds)
+VALUE posix_mqueue_timedreceive(VALUE self, VALUE args)
{
int err;
mqueue_t* data;
size_t buf_size;
char *buf;
struct timespec timeout;
VALUE str;
+ VALUE seconds = rb_ary_entry(args, 0);
+ VALUE nanoseconds = rb_ary_entry(args, 1);
+ if (seconds == Qnil) seconds = INT2FIX(0);
+ if (nanoseconds == Qnil) nanoseconds = INT2FIX(0);
+
TypedData_Get_Struct(self, mqueue_t, &mqueue_type, data);
if (!RB_TYPE_P(seconds, T_FIXNUM)) {
rb_raise(rb_eTypeError, "First argument must be a Fixnum");
}
if (!RB_TYPE_P(nanoseconds, T_FIXNUM)) {
- rb_raise(rb_eTypeError, "First argument must be a fixnum");
+ rb_raise(rb_eTypeError, "Second argument must be a Fixnum");
}
timeout.tv_sec = FIX2ULONG(seconds);
timeout.tv_nsec = FIX2ULONG(nanoseconds);
@@ -139,50 +151,82 @@
str = rb_str_new(buf, err);
free(buf);
return str;
-
- return Qtrue;
}
-VALUE posix_mqueue_timedsend(VALUE self, VALUE seconds, VALUE nanoseconds, VALUE message)
+VALUE posix_mqueue_timedsend(VALUE self, VALUE args)
{
int err;
mqueue_t* data;
struct timespec timeout;
+ VALUE message = rb_ary_entry(args, 0);
+ VALUE seconds = rb_ary_entry(args, 1);
+ VALUE nanoseconds = rb_ary_entry(args, 2);
+ if (seconds == Qnil) seconds = INT2FIX(0);
+ if (nanoseconds == Qnil) nanoseconds = INT2FIX(0);
+
TypedData_Get_Struct(self, mqueue_t, &mqueue_type, data);
- if (!RB_TYPE_P(seconds, T_FIXNUM)) {
+ if (!RB_TYPE_P(message, T_STRING)) {
+ rb_raise(rb_eTypeError, "Message must be a string");
+ }
+
+ if (!RB_TYPE_P(seconds, T_FIXNUM)) {
rb_raise(rb_eTypeError, "First argument must be a Fixnum");
}
if (!RB_TYPE_P(nanoseconds, T_FIXNUM)) {
- rb_raise(rb_eTypeError, "First argument must be a fixnum");
+ rb_raise(rb_eTypeError, "Second argument must be a Fixnum");
}
- if (!RB_TYPE_P(message, T_STRING)) {
- rb_raise(rb_eTypeError, "Message must be a string");
- }
-
timeout.tv_sec = FIX2ULONG(seconds);
timeout.tv_nsec = FIX2ULONG(nanoseconds);
err = mq_timedsend(data->fd, RSTRING_PTR(message), RSTRING_LEN(message), 10, &timeout);
if (err < 0) {
if(errno == 110) {
- rb_raise(rb_cQueueFull, "Queue full, most likely you wanna bump /proc/sys/fs/mqueue/msg_max from the default maximum queue size of 10.");
+ rb_raise(rb_cQueueFull, "Queue full, most likely you want to bump /proc/sys/fs/mqueue/msg_max from the default maximum queue size of 10.");
} else {
rb_sys_fail("Message sending failed, please consult mq_send(3)");
}
}
return Qtrue;
}
+VALUE posix_mqueue_size(VALUE self)
+{
+ mqueue_t* data;
+ struct mq_attr queue;
+
+ TypedData_Get_Struct(self, mqueue_t, &mqueue_type, data);
+
+ if (mq_getattr(data->fd, &queue) < 0) {
+ rb_sys_fail("Failed reading queue attributes, please consult mq_getattr(3)");
+ }
+
+ return INT2FIX(queue.mq_curmsgs);
+}
+
+VALUE posix_mqueue_msgsize(VALUE self)
+{
+ mqueue_t* data;
+ struct mq_attr queue;
+
+ TypedData_Get_Struct(self, mqueue_t, &mqueue_type, data);
+
+ if (mq_getattr(data->fd, &queue) < 0) {
+ rb_sys_fail("Failed reading queue attributes, please consult mq_getattr(3)");
+ }
+
+ return INT2FIX(queue.mq_msgsize);
+}
+
VALUE posix_mqueue_receive(VALUE self)
{
int err;
size_t buf_size;
char *buf;
@@ -195,31 +239,41 @@
buf_size = data->attr.mq_msgsize + 1;
// Make sure the buffer is capable
buf = (char*)malloc(buf_size);
- // TODO: Specify priority
+ rb_thread_wait_fd(data->fd);
+
err = mq_receive(data->fd, buf, buf_size, NULL);
+ if(err < 0 && errno == EINTR) {
+ err = mq_receive(data->fd, buf, buf_size, NULL);
+ }
+
if (err < 0) {
rb_sys_fail("Message retrieval failed, please consult mq_receive(3)");
}
str = rb_str_new(buf, err);
free(buf);
return str;
}
-VALUE posix_mqueue_initialize(VALUE self, VALUE queue)
+VALUE posix_mqueue_initialize(VALUE self, VALUE args)
{
- // TODO: Modify these options from initialize arguments
- // TODO: Set nonblock and handle error in #push
+ VALUE options = rb_ary_entry(args, 1);
+ if (options == Qnil) options = rb_hash_new();
+
+ int msgsize = FIX2INT(rb_hash_lookup2(options, ID2SYM(rb_intern("msgsize")), INT2FIX(4096)));
+ int maxmsg = FIX2INT(rb_hash_lookup2(options, ID2SYM(rb_intern("maxmsg")), INT2FIX(10)));
+ VALUE queue = rb_ary_entry(args, 0);
+
struct mq_attr attr = {
.mq_flags = 0, // Flags, 0 or O_NONBLOCK
- .mq_maxmsg = 10, // Max messages in queue
- .mq_msgsize = 4096, // Max message size (bytes)
+ .mq_maxmsg = maxmsg, // Max messages in queue
+ .mq_msgsize = msgsize, // Max message size (bytes)
.mq_curmsgs = 0 // # currently in queue
};
mqueue_t* data;
TypedData_Get_Struct(self, mqueue_t, &mqueue_type, data);
@@ -247,13 +301,15 @@
VALUE mqueue = rb_define_class_under(posix, "Mqueue", rb_cObject);
rb_cQueueFull = rb_define_class_under(mqueue, "QueueFull", rb_eStandardError);
rb_cQueueEmpty = rb_define_class_under(mqueue, "QueueEmpty", rb_eStandardError);
rb_define_alloc_func(mqueue, posix_mqueue_alloc);
- rb_define_method(mqueue, "initialize", posix_mqueue_initialize, 1);
+ rb_define_method(mqueue, "initialize", posix_mqueue_initialize, -2);
rb_define_method(mqueue, "send", posix_mqueue_send, 1);
rb_define_method(mqueue, "receive", posix_mqueue_receive, 0);
- rb_define_method(mqueue, "timedsend", posix_mqueue_timedsend, 3);
- rb_define_method(mqueue, "timedreceive", posix_mqueue_timedreceive, 2);
+ rb_define_method(mqueue, "timedsend", posix_mqueue_timedsend, -2);
+ rb_define_method(mqueue, "timedreceive", posix_mqueue_timedreceive, -2);
+ rb_define_method(mqueue, "msgsize", posix_mqueue_msgsize, 0);
+ rb_define_method(mqueue, "size", posix_mqueue_size, 0);
rb_define_method(mqueue, "unlink", posix_mqueue_unlink, 0);
}