ext/posix/mqueue.c in posix-mqueue-0.0.6 vs ext/posix/mqueue.c in posix-mqueue-0.0.7
- old
+ new
@@ -13,28 +13,33 @@
VALUE rb_cQueueFull = Qnil;
VALUE rb_cQueueEmpty = Qnil;
typedef struct {
mqd_t fd;
+ VALUE io;
struct mq_attr attr;
size_t queue_len;
char *queue;
}
mqueue_t;
static void
mqueue_mark(void* ptr)
{
+ mqueue_t* data = ptr;
+ rb_gc_mark(data->io);
(void)ptr;
}
static void
mqueue_free(void* ptr)
{
mqueue_t* data = ptr;
mq_close(data->fd);
xfree(data->queue);
+ // ??
+ // xfree(data->io);
xfree(ptr);
}
static size_t
mqueue_memsize(const void* ptr)
@@ -84,12 +89,12 @@
int err;
mqueue_t* data;
TypedData_Get_Struct(self, mqueue_t, &mqueue_type, data);
- if (!RB_TYPE_P(message, T_STRING)) {
- rb_raise(rb_eTypeError, "Message must be a string");
+ if (!RB_TYPE_P(message, T_STRING)) {
+ rb_raise(rb_eTypeError, "Message must be a string");
}
rb_io_wait_writable(data->fd);
// TODO: Custom priority
@@ -100,11 +105,11 @@
}
if (err < 0) {
rb_sys_fail("Message sending failed, please consult mq_send(3)");
}
-
+
return Qtrue;
}
VALUE posix_mqueue_timedreceive(VALUE self, VALUE args)
{
@@ -120,16 +125,16 @@
if (seconds == Qnil) seconds = INT2FIX(0);
if (nanoseconds == Qnil) nanoseconds = INT2FIX(0);
TypedData_Get_Struct(self, mqueue_t, &mqueue_type, data);
- if (!RB_TYPE_P(seconds, T_FIXNUM)) {
- rb_raise(rb_eTypeError, "First argument must be a Fixnum");
+ if (!RB_TYPE_P(seconds, T_FIXNUM)) {
+ rb_raise(rb_eTypeError, "First argument must be a Fixnum");
}
- if (!RB_TYPE_P(nanoseconds, T_FIXNUM)) {
- rb_raise(rb_eTypeError, "Second argument must be a Fixnum");
+ if (!RB_TYPE_P(nanoseconds, T_FIXNUM)) {
+ rb_raise(rb_eTypeError, "Second argument must be a Fixnum");
}
timeout.tv_sec = FIX2ULONG(seconds);
timeout.tv_nsec = FIX2ULONG(nanoseconds);
@@ -167,20 +172,20 @@
if (seconds == Qnil) seconds = INT2FIX(0);
if (nanoseconds == Qnil) nanoseconds = INT2FIX(0);
TypedData_Get_Struct(self, mqueue_t, &mqueue_type, data);
- if (!RB_TYPE_P(message, T_STRING)) {
- rb_raise(rb_eTypeError, "Message must be a string");
+ if (!RB_TYPE_P(message, T_STRING)) {
+ rb_raise(rb_eTypeError, "Message must be a string");
}
if (!RB_TYPE_P(seconds, T_FIXNUM)) {
- rb_raise(rb_eTypeError, "First argument must be a Fixnum");
+ rb_raise(rb_eTypeError, "First argument must be a Fixnum");
}
- if (!RB_TYPE_P(nanoseconds, T_FIXNUM)) {
- rb_raise(rb_eTypeError, "Second argument must be a Fixnum");
+ if (!RB_TYPE_P(nanoseconds, T_FIXNUM)) {
+ rb_raise(rb_eTypeError, "Second argument must be a Fixnum");
}
timeout.tv_sec = FIX2ULONG(seconds);
timeout.tv_nsec = FIX2ULONG(nanoseconds);
@@ -191,11 +196,11 @@
rb_raise(rb_cQueueFull, "Queue full, most likely you want to bump /proc/sys/fs/mqueue/msg_max from the default maximum queue size of 10.");
} else {
rb_sys_fail("Message sending failed, please consult mq_send(3)");
}
}
-
+
return Qtrue;
}
VALUE posix_mqueue_size(VALUE self)
{
@@ -209,10 +214,18 @@
}
return INT2FIX(queue.mq_curmsgs);
}
+VALUE posix_mqueue_to_io(VALUE self)
+{
+ mqueue_t* data;
+ TypedData_Get_Struct(self, mqueue_t, &mqueue_type, data);
+
+ return data->io;
+}
+
VALUE posix_mqueue_msgsize(VALUE self)
{
mqueue_t* data;
struct mq_attr queue;
@@ -257,18 +270,34 @@
free(buf);
return str;
}
-VALUE posix_mqueue_initialize(VALUE self, VALUE args)
+VALUE posix_mqueue_initialize(int argc, VALUE* argv, VALUE self)
{
- VALUE options = rb_ary_entry(args, 1);
- if (options == Qnil) options = rb_hash_new();
+ if(argc < 1)
+ {
+ rb_raise(rb_eArgError, "initialize requires at least one argument");
+ }
+ VALUE queue = argv[0];
+ if (!RB_TYPE_P(queue, T_STRING)) {
+ rb_raise(rb_eTypeError, "Queue name must be a string");
+ }
+
+ VALUE options;
+ if (argc < 2)
+ {
+ options = rb_hash_new();
+ }
+ else
+ {
+ options = argv[1];
+ }
+
int msgsize = FIX2INT(rb_hash_lookup2(options, ID2SYM(rb_intern("msgsize")), INT2FIX(4096)));
int maxmsg = FIX2INT(rb_hash_lookup2(options, ID2SYM(rb_intern("maxmsg")), INT2FIX(10)));
- VALUE queue = rb_ary_entry(args, 0);
struct mq_attr attr = {
.mq_flags = 0, // Flags, 0 or O_NONBLOCK
.mq_maxmsg = maxmsg, // Max messages in queue
.mq_msgsize = msgsize, // Max message size (bytes)
@@ -290,10 +319,15 @@
if (data->fd == (mqd_t)-1) {
rb_sys_fail("Failed opening the message queue, please consult mq_open(3)");
}
+ VALUE args[1];
+ args[0] = INT2FIX(data->fd);
+
+ data->io = rb_class_new_instance(1, args, rb_path2class("IO"));
+
return self;
}
void Init_mqueue()
{
@@ -301,15 +335,16 @@
VALUE mqueue = rb_define_class_under(posix, "Mqueue", rb_cObject);
rb_cQueueFull = rb_define_class_under(mqueue, "QueueFull", rb_eStandardError);
rb_cQueueEmpty = rb_define_class_under(mqueue, "QueueEmpty", rb_eStandardError);
rb_define_alloc_func(mqueue, posix_mqueue_alloc);
- rb_define_method(mqueue, "initialize", posix_mqueue_initialize, -2);
+ rb_define_method(mqueue, "initialize", posix_mqueue_initialize, -1);
rb_define_method(mqueue, "send", posix_mqueue_send, 1);
rb_define_method(mqueue, "receive", posix_mqueue_receive, 0);
rb_define_method(mqueue, "timedsend", posix_mqueue_timedsend, -2);
rb_define_method(mqueue, "timedreceive", posix_mqueue_timedreceive, -2);
rb_define_method(mqueue, "msgsize", posix_mqueue_msgsize, 0);
rb_define_method(mqueue, "size", posix_mqueue_size, 0);
rb_define_method(mqueue, "unlink", posix_mqueue_unlink, 0);
+ rb_define_method(mqueue, "to_io", posix_mqueue_to_io, 0);
}