ext/posix/mqueue.c in posix-mqueue-0.0.6 vs ext/posix/mqueue.c in posix-mqueue-0.0.7

- old
+ new

@@ -13,28 +13,33 @@ VALUE rb_cQueueFull = Qnil; VALUE rb_cQueueEmpty = Qnil; typedef struct { mqd_t fd; + VALUE io; struct mq_attr attr; size_t queue_len; char *queue; } mqueue_t; static void mqueue_mark(void* ptr) { + mqueue_t* data = ptr; + rb_gc_mark(data->io); (void)ptr; } static void mqueue_free(void* ptr) { mqueue_t* data = ptr; mq_close(data->fd); xfree(data->queue); + // ?? + // xfree(data->io); xfree(ptr); } static size_t mqueue_memsize(const void* ptr) @@ -84,12 +89,12 @@ int err; mqueue_t* data; TypedData_Get_Struct(self, mqueue_t, &mqueue_type, data); - if (!RB_TYPE_P(message, T_STRING)) { - rb_raise(rb_eTypeError, "Message must be a string"); + if (!RB_TYPE_P(message, T_STRING)) { + rb_raise(rb_eTypeError, "Message must be a string"); } rb_io_wait_writable(data->fd); // TODO: Custom priority @@ -100,11 +105,11 @@ } if (err < 0) { rb_sys_fail("Message sending failed, please consult mq_send(3)"); } - + return Qtrue; } VALUE posix_mqueue_timedreceive(VALUE self, VALUE args) { @@ -120,16 +125,16 @@ if (seconds == Qnil) seconds = INT2FIX(0); if (nanoseconds == Qnil) nanoseconds = INT2FIX(0); TypedData_Get_Struct(self, mqueue_t, &mqueue_type, data); - if (!RB_TYPE_P(seconds, T_FIXNUM)) { - rb_raise(rb_eTypeError, "First argument must be a Fixnum"); + if (!RB_TYPE_P(seconds, T_FIXNUM)) { + rb_raise(rb_eTypeError, "First argument must be a Fixnum"); } - if (!RB_TYPE_P(nanoseconds, T_FIXNUM)) { - rb_raise(rb_eTypeError, "Second argument must be a Fixnum"); + if (!RB_TYPE_P(nanoseconds, T_FIXNUM)) { + rb_raise(rb_eTypeError, "Second argument must be a Fixnum"); } timeout.tv_sec = FIX2ULONG(seconds); timeout.tv_nsec = FIX2ULONG(nanoseconds); @@ -167,20 +172,20 @@ if (seconds == Qnil) seconds = INT2FIX(0); if (nanoseconds == Qnil) nanoseconds = INT2FIX(0); TypedData_Get_Struct(self, mqueue_t, &mqueue_type, data); - if (!RB_TYPE_P(message, T_STRING)) { - rb_raise(rb_eTypeError, "Message must be a string"); + if (!RB_TYPE_P(message, T_STRING)) { + rb_raise(rb_eTypeError, "Message must be a string"); } if (!RB_TYPE_P(seconds, T_FIXNUM)) { - rb_raise(rb_eTypeError, "First argument must be a Fixnum"); + rb_raise(rb_eTypeError, "First argument must be a Fixnum"); } - if (!RB_TYPE_P(nanoseconds, T_FIXNUM)) { - rb_raise(rb_eTypeError, "Second argument must be a Fixnum"); + if (!RB_TYPE_P(nanoseconds, T_FIXNUM)) { + rb_raise(rb_eTypeError, "Second argument must be a Fixnum"); } timeout.tv_sec = FIX2ULONG(seconds); timeout.tv_nsec = FIX2ULONG(nanoseconds); @@ -191,11 +196,11 @@ rb_raise(rb_cQueueFull, "Queue full, most likely you want to bump /proc/sys/fs/mqueue/msg_max from the default maximum queue size of 10."); } else { rb_sys_fail("Message sending failed, please consult mq_send(3)"); } } - + return Qtrue; } VALUE posix_mqueue_size(VALUE self) { @@ -209,10 +214,18 @@ } return INT2FIX(queue.mq_curmsgs); } +VALUE posix_mqueue_to_io(VALUE self) +{ + mqueue_t* data; + TypedData_Get_Struct(self, mqueue_t, &mqueue_type, data); + + return data->io; +} + VALUE posix_mqueue_msgsize(VALUE self) { mqueue_t* data; struct mq_attr queue; @@ -257,18 +270,34 @@ free(buf); return str; } -VALUE posix_mqueue_initialize(VALUE self, VALUE args) +VALUE posix_mqueue_initialize(int argc, VALUE* argv, VALUE self) { - VALUE options = rb_ary_entry(args, 1); - if (options == Qnil) options = rb_hash_new(); + if(argc < 1) + { + rb_raise(rb_eArgError, "initialize requires at least one argument"); + } + VALUE queue = argv[0]; + if (!RB_TYPE_P(queue, T_STRING)) { + rb_raise(rb_eTypeError, "Queue name must be a string"); + } + + VALUE options; + if (argc < 2) + { + options = rb_hash_new(); + } + else + { + options = argv[1]; + } + int msgsize = FIX2INT(rb_hash_lookup2(options, ID2SYM(rb_intern("msgsize")), INT2FIX(4096))); int maxmsg = FIX2INT(rb_hash_lookup2(options, ID2SYM(rb_intern("maxmsg")), INT2FIX(10))); - VALUE queue = rb_ary_entry(args, 0); struct mq_attr attr = { .mq_flags = 0, // Flags, 0 or O_NONBLOCK .mq_maxmsg = maxmsg, // Max messages in queue .mq_msgsize = msgsize, // Max message size (bytes) @@ -290,10 +319,15 @@ if (data->fd == (mqd_t)-1) { rb_sys_fail("Failed opening the message queue, please consult mq_open(3)"); } + VALUE args[1]; + args[0] = INT2FIX(data->fd); + + data->io = rb_class_new_instance(1, args, rb_path2class("IO")); + return self; } void Init_mqueue() { @@ -301,15 +335,16 @@ VALUE mqueue = rb_define_class_under(posix, "Mqueue", rb_cObject); rb_cQueueFull = rb_define_class_under(mqueue, "QueueFull", rb_eStandardError); rb_cQueueEmpty = rb_define_class_under(mqueue, "QueueEmpty", rb_eStandardError); rb_define_alloc_func(mqueue, posix_mqueue_alloc); - rb_define_method(mqueue, "initialize", posix_mqueue_initialize, -2); + rb_define_method(mqueue, "initialize", posix_mqueue_initialize, -1); rb_define_method(mqueue, "send", posix_mqueue_send, 1); rb_define_method(mqueue, "receive", posix_mqueue_receive, 0); rb_define_method(mqueue, "timedsend", posix_mqueue_timedsend, -2); rb_define_method(mqueue, "timedreceive", posix_mqueue_timedreceive, -2); rb_define_method(mqueue, "msgsize", posix_mqueue_msgsize, 0); rb_define_method(mqueue, "size", posix_mqueue_size, 0); rb_define_method(mqueue, "unlink", posix_mqueue_unlink, 0); + rb_define_method(mqueue, "to_io", posix_mqueue_to_io, 0); }