lib/zmachine/zmq_channel.rb in zmachine-0.2.1 vs lib/zmachine/zmq_channel.rb in zmachine-0.3.0

- old
+ new

@@ -1,138 +1,130 @@ -require 'zmachine/jeromq-0.3.0-SNAPSHOT.jar' +require 'zmachine/jeromq-0.3.2-SNAPSHOT.jar' +java_import org.zeromq.ZMsg java_import org.zeromq.ZMQ java_import org.zeromq.ZMQException require 'zmachine' require 'zmachine/channel' class ZMQ class Socket # for performance reason we alias the method here (otherwise it uses reflections all the time!) # super ugly, since we need to dynamically infer the java class of byte[] + java_alias :send_byte_buffer, :sendByteBuffer, [Java::JavaNio::ByteBuffer.java_class, Java::int] java_alias :send_byte_array, :send, [[].to_java(:byte).java_class, Java::int] java_alias :recv_byte_array, :recv, [Java::int] + + def write(buffer) + bytes = send_byte_buffer(buffer, 0) + buffer.position(buffer.position + bytes) + end end end module ZMachine class ZMQChannel < Channel - def initialize(type) - super() - @socket = ZMachine.context.create_socket(type) - @bound = false - @connected = false - @closed = false - end + extend Forwardable - def identity=(v) - @socket.identity = v if @socket + def_delegator :@socket, :identity + def_delegator :@socket, :identity= + + def initialize + super + @raw = true end - def identity - @socket ? @socket.identity : nil - end def selectable_fd @socket.fd end - def bind(address, port = nil) + def bind(address, type) + ZMachine.logger.debug("zmachine:zmq_channel:#{__method__}", channel: self) if ZMachine.debug @bound = true + @connected = true + @socket = ZMachine.context.create_socket(type) @socket.bind(address) end def bound? @bound end - def connect(address) - @connected = true + def accept + ZMachine.logger.debug("zmachine:zmq_channel:#{__method__}", channel: self) if ZMachine.debug + self + end + + def connect(address, type) + ZMachine.logger.debug("zmachine:zmq_channel:#{__method__}", channel: self) if ZMachine.debug + @connection_pending = true + @socket = ZMachine.context.create_socket(type) @socket.connect(address) end def connection_pending? - false + @connection_pending end + def finish_connecting + ZMachine.logger.debug("zmachine:zmq_channel:#{__method__}", channel: self) if ZMachine.debug + return unless connection_pending? + @connected = true + end + def connected? @connected end def read_inbound_data - data = [@socket.recv_byte_array(0)] - while @socket.hasReceiveMore - data << @socket.recv_byte_array(0) - end + ZMachine.logger.debug("zmachine:zmq_channel:#{__method__}", channel: self) if ZMachine.debug + data = ZMsg.recv_msg(@socket) + data = String.from_java_bytes(data.first.data) unless @raw data end - def send_data(data) - parts, last = data[0..-2], data.last - parts.each do |part| - @socket.send_byte_array(part, ZMQ::SNDMORE | ZMQ::DONTWAIT) - end - @socket.send_byte_array(last, ZMQ::DONTWAIT) - rescue ZMQException - @outbound_queue << data - end - - # to get around iterating over an array in #send_data we pass message parts - # as arguments def send1(a) @socket.send_byte_array(a, ZMQ::DONTWAIT) end def send2(a, b) - @socket.send_byte_array(a, ZMQ::SNDMORE | ZMQ::DONTWAIT) + @socket.send_byte_array(a, ZMQ::SNDMORE | ZMQ::DONTWAIT) and @socket.send_byte_array(b, ZMQ::DONTWAIT) end def send3(a, b, c) - @socket.send_byte_array(a, ZMQ::SNDMORE | ZMQ::DONTWAIT) - @socket.send_byte_array(b, ZMQ::SNDMORE | ZMQ::DONTWAIT) + @socket.send_byte_array(a, ZMQ::SNDMORE | ZMQ::DONTWAIT) and + @socket.send_byte_array(b, ZMQ::SNDMORE | ZMQ::DONTWAIT) and @socket.send_byte_array(c, ZMQ::DONTWAIT) end def send4(a, b, c, d) - @socket.send_byte_array(a, ZMQ::SNDMORE | ZMQ::DONTWAIT) - @socket.send_byte_array(b, ZMQ::SNDMORE | ZMQ::DONTWAIT) - @socket.send_byte_array(c, ZMQ::SNDMORE | ZMQ::DONTWAIT) + @socket.send_byte_array(a, ZMQ::SNDMORE | ZMQ::DONTWAIT) and + @socket.send_byte_array(b, ZMQ::SNDMORE | ZMQ::DONTWAIT) and + @socket.send_byte_array(c, ZMQ::SNDMORE | ZMQ::DONTWAIT) and @socket.send_byte_array(d, ZMQ::DONTWAIT) end - def has_more? - @socket.events & ZMQ::Poller::POLLIN == ZMQ::Poller::POLLIN - end - - def can_send? - super and (@socket.events & ZMQ::Poller::POLLOUT == ZMQ::Poller::POLLOUT) - end - - def write_outbound_data - while can_send? - data = @outbound_queue.shift - send_data(data) - end - - close if @close_scheduled - return true - end - - def close(after_writing = false) - super + def close! + ZMachine.logger.debug("zmachine:zmq_channel:#{__method__}", channel: self) if ZMachine.debug @closed = true @connected = false @bound = false - ZMachine.context.destroySocket(@socket) unless can_send? + ZMachine.context.destroySocket(@socket) end def closed? @closed end def peer raise RuntimeError.new("ZMQChannel has no peer") + end + + # see comment in ConnectionManager#process + def can_recv? + @socket.events & ZMQ::Poller::POLLIN == ZMQ::Poller::POLLIN end end end