lib/zmachine/zmq_channel.rb in zmachine-0.2.1 vs lib/zmachine/zmq_channel.rb in zmachine-0.3.0
- old
+ new
@@ -1,138 +1,130 @@
-require 'zmachine/jeromq-0.3.0-SNAPSHOT.jar'
+require 'zmachine/jeromq-0.3.2-SNAPSHOT.jar'
+java_import org.zeromq.ZMsg
java_import org.zeromq.ZMQ
java_import org.zeromq.ZMQException
require 'zmachine'
require 'zmachine/channel'
class ZMQ
class Socket
# for performance reason we alias the method here (otherwise it uses reflections all the time!)
# super ugly, since we need to dynamically infer the java class of byte[]
+ java_alias :send_byte_buffer, :sendByteBuffer, [Java::JavaNio::ByteBuffer.java_class, Java::int]
java_alias :send_byte_array, :send, [[].to_java(:byte).java_class, Java::int]
java_alias :recv_byte_array, :recv, [Java::int]
+
+ def write(buffer)
+ bytes = send_byte_buffer(buffer, 0)
+ buffer.position(buffer.position + bytes)
+ end
end
end
module ZMachine
class ZMQChannel < Channel
- def initialize(type)
- super()
- @socket = ZMachine.context.create_socket(type)
- @bound = false
- @connected = false
- @closed = false
- end
+ extend Forwardable
- def identity=(v)
- @socket.identity = v if @socket
+ def_delegator :@socket, :identity
+ def_delegator :@socket, :identity=
+
+ def initialize
+ super
+ @raw = true
end
- def identity
- @socket ? @socket.identity : nil
- end
def selectable_fd
@socket.fd
end
- def bind(address, port = nil)
+ def bind(address, type)
+ ZMachine.logger.debug("zmachine:zmq_channel:#{__method__}", channel: self) if ZMachine.debug
@bound = true
+ @connected = true
+ @socket = ZMachine.context.create_socket(type)
@socket.bind(address)
end
def bound?
@bound
end
- def connect(address)
- @connected = true
+ def accept
+ ZMachine.logger.debug("zmachine:zmq_channel:#{__method__}", channel: self) if ZMachine.debug
+ self
+ end
+
+ def connect(address, type)
+ ZMachine.logger.debug("zmachine:zmq_channel:#{__method__}", channel: self) if ZMachine.debug
+ @connection_pending = true
+ @socket = ZMachine.context.create_socket(type)
@socket.connect(address)
end
def connection_pending?
- false
+ @connection_pending
end
+ def finish_connecting
+ ZMachine.logger.debug("zmachine:zmq_channel:#{__method__}", channel: self) if ZMachine.debug
+ return unless connection_pending?
+ @connected = true
+ end
+
def connected?
@connected
end
def read_inbound_data
- data = [@socket.recv_byte_array(0)]
- while @socket.hasReceiveMore
- data << @socket.recv_byte_array(0)
- end
+ ZMachine.logger.debug("zmachine:zmq_channel:#{__method__}", channel: self) if ZMachine.debug
+ data = ZMsg.recv_msg(@socket)
+ data = String.from_java_bytes(data.first.data) unless @raw
data
end
- def send_data(data)
- parts, last = data[0..-2], data.last
- parts.each do |part|
- @socket.send_byte_array(part, ZMQ::SNDMORE | ZMQ::DONTWAIT)
- end
- @socket.send_byte_array(last, ZMQ::DONTWAIT)
- rescue ZMQException
- @outbound_queue << data
- end
-
- # to get around iterating over an array in #send_data we pass message parts
- # as arguments
def send1(a)
@socket.send_byte_array(a, ZMQ::DONTWAIT)
end
def send2(a, b)
- @socket.send_byte_array(a, ZMQ::SNDMORE | ZMQ::DONTWAIT)
+ @socket.send_byte_array(a, ZMQ::SNDMORE | ZMQ::DONTWAIT) and
@socket.send_byte_array(b, ZMQ::DONTWAIT)
end
def send3(a, b, c)
- @socket.send_byte_array(a, ZMQ::SNDMORE | ZMQ::DONTWAIT)
- @socket.send_byte_array(b, ZMQ::SNDMORE | ZMQ::DONTWAIT)
+ @socket.send_byte_array(a, ZMQ::SNDMORE | ZMQ::DONTWAIT) and
+ @socket.send_byte_array(b, ZMQ::SNDMORE | ZMQ::DONTWAIT) and
@socket.send_byte_array(c, ZMQ::DONTWAIT)
end
def send4(a, b, c, d)
- @socket.send_byte_array(a, ZMQ::SNDMORE | ZMQ::DONTWAIT)
- @socket.send_byte_array(b, ZMQ::SNDMORE | ZMQ::DONTWAIT)
- @socket.send_byte_array(c, ZMQ::SNDMORE | ZMQ::DONTWAIT)
+ @socket.send_byte_array(a, ZMQ::SNDMORE | ZMQ::DONTWAIT) and
+ @socket.send_byte_array(b, ZMQ::SNDMORE | ZMQ::DONTWAIT) and
+ @socket.send_byte_array(c, ZMQ::SNDMORE | ZMQ::DONTWAIT) and
@socket.send_byte_array(d, ZMQ::DONTWAIT)
end
- def has_more?
- @socket.events & ZMQ::Poller::POLLIN == ZMQ::Poller::POLLIN
- end
-
- def can_send?
- super and (@socket.events & ZMQ::Poller::POLLOUT == ZMQ::Poller::POLLOUT)
- end
-
- def write_outbound_data
- while can_send?
- data = @outbound_queue.shift
- send_data(data)
- end
-
- close if @close_scheduled
- return true
- end
-
- def close(after_writing = false)
- super
+ def close!
+ ZMachine.logger.debug("zmachine:zmq_channel:#{__method__}", channel: self) if ZMachine.debug
@closed = true
@connected = false
@bound = false
- ZMachine.context.destroySocket(@socket) unless can_send?
+ ZMachine.context.destroySocket(@socket)
end
def closed?
@closed
end
def peer
raise RuntimeError.new("ZMQChannel has no peer")
+ end
+
+ # see comment in ConnectionManager#process
+ def can_recv?
+ @socket.events & ZMQ::Poller::POLLIN == ZMQ::Poller::POLLIN
end
end
end