lib/zmachine/zmq_channel.rb in zmachine-0.1.3 vs lib/zmachine/zmq_channel.rb in zmachine-0.2.0
- old
+ new
@@ -1,172 +1,131 @@
+require 'zmachine/jeromq-0.3.0-SNAPSHOT.jar'
java_import org.zeromq.ZMQ
-java_import org.zeromq.ZMsg
java_import org.zeromq.ZMQException
+require 'zmachine'
require 'zmachine/channel'
-class ZMsg
- # for performance reason we alias the method here (otherwise it uses reflections all the time!)
- java_alias :post, :send, [org.zeromq.ZMQ::Socket, Java::boolean]
-end
-
-# this needs to be moved to a seperate file
class ZMQ
class Socket
- def self.create_socket_with_opts(type, opts = {})
- socket = ZMachine.context.create_socket(type)
- socket.setLinger(opts[:linger]) if opts[:linger]
- socket.setSndHWM(opts[:sndhwm]) if opts[:sndhwm]
- socket.setRcvHWM(opts[:rcvhwm]) if opts[:rcvhwm]
- socket.set_router_mandatory(true) if type == ZMQ::ROUTER
- socket.connect(opts[:connect]) if opts[:connect]
- socket.bind(opts[:bind]) if opts[:bind]
- socket
- end
-
- def self.pair(opts = {})
- create_socket_with_opts(ZMQ::PAIR, opts)
- end
- def self.router(opts = {})
- create_socket_with_opts(ZMQ::ROUTER, opts)
- end
- def self.pull(opts = {})
- create_socket_with_opts(ZMQ::PULL, opts)
- end
- def self.push(opts = {})
- create_socket_with_opts(ZMQ::PUSH, opts)
- end
- def self.dealer(opts = {})
- create_socket_with_opts(ZMQ::DEALER, opts)
- end
- def self.pub(opts = {})
- create_socket_with_opts(ZMQ::PUB, opts)
- end
-
- def send2(a,b)
- sent = send a, ZMQ::SNDMORE | ZMQ::DONTWAIT
- sent &= send b, ZMQ::DONTWAIT
- sent
- end
-
- def send3(a,b,c)
- sent = send a, ZMQ::SNDMORE | ZMQ::DONTWAIT
- sent &= send b, ZMQ::SNDMORE | ZMQ::DONTWAIT
- sent &= send c, ZMQ::DONTWAIT
- sent
- end
+ # for performance reason we alias the method here (otherwise it uses reflections all the time!)
+ # super ugly, since we need to dynamically infer the java class of byte[]
+ java_alias :send_byte_array, :send, [[].to_java(:byte).java_class, Java::int]
+ java_alias :recv_byte_array, :recv, [Java::int]
end
end
module ZMachine
class ZMQChannel < Channel
- attr_reader :port
- attr_reader :socket
-
- def initialize(type, selector)
- super(selector)
- @socket = ZMQChannel.create_socket_with_opts type, linger: 0
+ def initialize(type)
+ super()
+ @socket = ZMachine.context.create_socket(type)
+ @bound = false
+ @connected = false
+ @closed = false
end
- def self.create_socket_with_opts(type, opts = {})
- socket = ZMachine.context.create_socket(type)
- socket.setLinger(opts[:linger]) if opts[:linger]
- socket.setSndHWM(opts[:sndhwm]) if opts[:sndhwm]
- socket.setRcvHWM(opts[:rcvhwm]) if opts[:rcvhwm]
- socket.set_router_mandatory(true) if type == ZMQ::ROUTER
- socket.connect(opts[:connect]) if opts[:connect]
- socket.bind(opts[:bind]) if opts[:bind]
- socket
+ def selectable_fd
+ @socket.fd
end
- def register
- @channel_key ||= @socket.fd.register(@selector, current_events, self)
+ def bind(address, port = nil)
+ @bound = true
+ @socket.bind(address)
end
- def bind(address)
- @port = @socket.bind(address)
+ def bound?
+ @bound
end
def connect(address)
- @socket.connect(address) if address
+ @connected = true
+ @socket.connect(address)
end
- def identity=(value)
- @socket.identity = value.to_java_bytes
+ def connection_pending?
+ false
end
- def close
- if @channel_key
- @channel_key.cancel
- @channel_key = nil
- end
+ def connected?
+ @connected
+ end
- @socket.close
+ def read_inbound_data
+ data = [@socket.recv_byte_array(0)]
+ while @socket.hasReceiveMore
+ data << @socket.recv_byte_array(0)
+ end
+ data
end
def send_data(data)
- @outbound_queue << data unless send_msg(data)
+ parts, last = data[0..-2], data.last
+ parts.each do |part|
+ @socket.send_byte_array(part, ZMQ::SNDMORE | ZMQ::DONTWAIT)
+ end
+ @socket.send_byte_array(last, ZMQ::DONTWAIT)
+ rescue ZMQException
+ @outbound_queue << data
end
- def send2(a,b)
- @socket.send2 a,b
+ # to get around iterating over an array in #send_data we pass message parts
+ # as arguments
+ def send1(a)
+ @socket.send_byte_array(a, ZMQ::DONTWAIT)
end
- def send3(a,b,c)
- @socket.send3 a,b,c
+ def send2(a, b)
+ @socket.send_byte_array(a, ZMQ::DONTWAIT | ZMQ::DONTWAIT)
+ @socket.send_byte_array(b, ZMQ::DONTWAIT)
end
- def send_msg(msg)
- msg.post(@socket, true)
- return true
- rescue ZMQException => e
- return false
+ def send3(a, b, c)
+ @socket.send_byte_array(a, ZMQ::DONTWAIT | ZMQ::DONTWAIT)
+ @socket.send_byte_array(b, ZMQ::DONTWAIT | ZMQ::DONTWAIT)
+ @socket.send_byte_array(c, ZMQ::DONTWAIT)
end
- def read_inbound_data(buffer)
- return unless has_more?
- ZMsg.recv_msg(@socket)
+ def send4(a, b, c, d)
+ @socket.send_byte_array(a, ZMQ::DONTWAIT | ZMQ::DONTWAIT)
+ @socket.send_byte_array(b, ZMQ::DONTWAIT | ZMQ::DONTWAIT)
+ @socket.send_byte_array(c, ZMQ::DONTWAIT | ZMQ::DONTWAIT)
+ @socket.send_byte_array(d, ZMQ::DONTWAIT)
end
- def write_outbound_data
- until @outbound_queue.empty?
- data = @outbound_queue.first
- if send_msg(data)
- @outbound_queue.shift
- else
- break
- end
- end
-
- return true
- end
-
def has_more?
@socket.events & ZMQ::Poller::POLLIN == ZMQ::Poller::POLLIN
end
def can_send?
- @socket.events & ZMQ::Poller::POLLOUT == ZMQ::Poller::POLLOUT
+ super and (@socket.events & ZMQ::Poller::POLLOUT == ZMQ::Poller::POLLOUT)
end
- def schedule_close(after_writing)
- true
+ def write_outbound_data
+ while can_send?
+ data = @outbound_queue.shift
+ send_data(data)
+ end
+
+ close if @close_scheduled
+ return true
end
- # TODO: fix me
- def peer_name
- sock = @socket.socket
- [sock.port, sock.inet_address.host_address]
+ def close(after_writing = false)
+ super
+ @closed = true
+ @connected = false
+ @bound = false
+ ZMachine.context.destroySocket(@socket) unless can_send?
end
- def sock_name
- sock = @socket.socket
- [sock.local_port, sock.local_address.host_address]
+ def closed?
+ @closed
end
- def current_events
- SelectionKey::OP_READ
+ def peer
+ raise RuntimeError.new("ZMQChannel has no peer")
end
end
end