lib/zmachine/zmq_channel.rb in zmachine-0.1.3 vs lib/zmachine/zmq_channel.rb in zmachine-0.2.0

- old
+ new

@@ -1,172 +1,131 @@ +require 'zmachine/jeromq-0.3.0-SNAPSHOT.jar' java_import org.zeromq.ZMQ -java_import org.zeromq.ZMsg java_import org.zeromq.ZMQException +require 'zmachine' require 'zmachine/channel' -class ZMsg - # for performance reason we alias the method here (otherwise it uses reflections all the time!) - java_alias :post, :send, [org.zeromq.ZMQ::Socket, Java::boolean] -end - -# this needs to be moved to a seperate file class ZMQ class Socket - def self.create_socket_with_opts(type, opts = {}) - socket = ZMachine.context.create_socket(type) - socket.setLinger(opts[:linger]) if opts[:linger] - socket.setSndHWM(opts[:sndhwm]) if opts[:sndhwm] - socket.setRcvHWM(opts[:rcvhwm]) if opts[:rcvhwm] - socket.set_router_mandatory(true) if type == ZMQ::ROUTER - socket.connect(opts[:connect]) if opts[:connect] - socket.bind(opts[:bind]) if opts[:bind] - socket - end - - def self.pair(opts = {}) - create_socket_with_opts(ZMQ::PAIR, opts) - end - def self.router(opts = {}) - create_socket_with_opts(ZMQ::ROUTER, opts) - end - def self.pull(opts = {}) - create_socket_with_opts(ZMQ::PULL, opts) - end - def self.push(opts = {}) - create_socket_with_opts(ZMQ::PUSH, opts) - end - def self.dealer(opts = {}) - create_socket_with_opts(ZMQ::DEALER, opts) - end - def self.pub(opts = {}) - create_socket_with_opts(ZMQ::PUB, opts) - end - - def send2(a,b) - sent = send a, ZMQ::SNDMORE | ZMQ::DONTWAIT - sent &= send b, ZMQ::DONTWAIT - sent - end - - def send3(a,b,c) - sent = send a, ZMQ::SNDMORE | ZMQ::DONTWAIT - sent &= send b, ZMQ::SNDMORE | ZMQ::DONTWAIT - sent &= send c, ZMQ::DONTWAIT - sent - end + # for performance reason we alias the method here (otherwise it uses reflections all the time!) + # super ugly, since we need to dynamically infer the java class of byte[] + java_alias :send_byte_array, :send, [[].to_java(:byte).java_class, Java::int] + java_alias :recv_byte_array, :recv, [Java::int] end end module ZMachine class ZMQChannel < Channel - attr_reader :port - attr_reader :socket - - def initialize(type, selector) - super(selector) - @socket = ZMQChannel.create_socket_with_opts type, linger: 0 + def initialize(type) + super() + @socket = ZMachine.context.create_socket(type) + @bound = false + @connected = false + @closed = false end - def self.create_socket_with_opts(type, opts = {}) - socket = ZMachine.context.create_socket(type) - socket.setLinger(opts[:linger]) if opts[:linger] - socket.setSndHWM(opts[:sndhwm]) if opts[:sndhwm] - socket.setRcvHWM(opts[:rcvhwm]) if opts[:rcvhwm] - socket.set_router_mandatory(true) if type == ZMQ::ROUTER - socket.connect(opts[:connect]) if opts[:connect] - socket.bind(opts[:bind]) if opts[:bind] - socket + def selectable_fd + @socket.fd end - def register - @channel_key ||= @socket.fd.register(@selector, current_events, self) + def bind(address, port = nil) + @bound = true + @socket.bind(address) end - def bind(address) - @port = @socket.bind(address) + def bound? + @bound end def connect(address) - @socket.connect(address) if address + @connected = true + @socket.connect(address) end - def identity=(value) - @socket.identity = value.to_java_bytes + def connection_pending? + false end - def close - if @channel_key - @channel_key.cancel - @channel_key = nil - end + def connected? + @connected + end - @socket.close + def read_inbound_data + data = [@socket.recv_byte_array(0)] + while @socket.hasReceiveMore + data << @socket.recv_byte_array(0) + end + data end def send_data(data) - @outbound_queue << data unless send_msg(data) + parts, last = data[0..-2], data.last + parts.each do |part| + @socket.send_byte_array(part, ZMQ::SNDMORE | ZMQ::DONTWAIT) + end + @socket.send_byte_array(last, ZMQ::DONTWAIT) + rescue ZMQException + @outbound_queue << data end - def send2(a,b) - @socket.send2 a,b + # to get around iterating over an array in #send_data we pass message parts + # as arguments + def send1(a) + @socket.send_byte_array(a, ZMQ::DONTWAIT) end - def send3(a,b,c) - @socket.send3 a,b,c + def send2(a, b) + @socket.send_byte_array(a, ZMQ::DONTWAIT | ZMQ::DONTWAIT) + @socket.send_byte_array(b, ZMQ::DONTWAIT) end - def send_msg(msg) - msg.post(@socket, true) - return true - rescue ZMQException => e - return false + def send3(a, b, c) + @socket.send_byte_array(a, ZMQ::DONTWAIT | ZMQ::DONTWAIT) + @socket.send_byte_array(b, ZMQ::DONTWAIT | ZMQ::DONTWAIT) + @socket.send_byte_array(c, ZMQ::DONTWAIT) end - def read_inbound_data(buffer) - return unless has_more? - ZMsg.recv_msg(@socket) + def send4(a, b, c, d) + @socket.send_byte_array(a, ZMQ::DONTWAIT | ZMQ::DONTWAIT) + @socket.send_byte_array(b, ZMQ::DONTWAIT | ZMQ::DONTWAIT) + @socket.send_byte_array(c, ZMQ::DONTWAIT | ZMQ::DONTWAIT) + @socket.send_byte_array(d, ZMQ::DONTWAIT) end - def write_outbound_data - until @outbound_queue.empty? - data = @outbound_queue.first - if send_msg(data) - @outbound_queue.shift - else - break - end - end - - return true - end - def has_more? @socket.events & ZMQ::Poller::POLLIN == ZMQ::Poller::POLLIN end def can_send? - @socket.events & ZMQ::Poller::POLLOUT == ZMQ::Poller::POLLOUT + super and (@socket.events & ZMQ::Poller::POLLOUT == ZMQ::Poller::POLLOUT) end - def schedule_close(after_writing) - true + def write_outbound_data + while can_send? + data = @outbound_queue.shift + send_data(data) + end + + close if @close_scheduled + return true end - # TODO: fix me - def peer_name - sock = @socket.socket - [sock.port, sock.inet_address.host_address] + def close(after_writing = false) + super + @closed = true + @connected = false + @bound = false + ZMachine.context.destroySocket(@socket) unless can_send? end - def sock_name - sock = @socket.socket - [sock.local_port, sock.local_address.host_address] + def closed? + @closed end - def current_events - SelectionKey::OP_READ + def peer + raise RuntimeError.new("ZMQChannel has no peer") end end end