Sha256: f3ef6b1e583db961216f608a5c3cfb97e1cf3338fa30f55997a63261acf97182
Contents?: true
Size: 1.94 KB
Versions: 2
Compression:
Stored size: 1.94 KB
Contents
java_import org.zeromq.ZMQ java_import org.zeromq.ZMsg java_import org.zeromq.ZMQException require 'zmachine/channel' module ZMachine class ZMQChannel < Channel attr_reader :port def initialize(type, selector) super(selector) @socket = ZMachine.context.create_socket(type) @socket.linger = 0 @socket.set_router_mandatory(true) if type == ZMQ::ROUTER end def register @channel_key ||= @socket.fd.register(@selector, current_events, self) end def bind(address) @port = @socket.bind(address) end def connect(address) @socket.connect(address) if address end def identity=(value) @socket.identity = value.to_java_bytes end def close if @channel_key @channel_key.cancel @channel_key = nil end @socket.close end def send_data(data) @outbound_queue << data unless send_msg(data) end def send_msg(msg) msg.java_send(:send, [org.zeromq.ZMQ::Socket], @socket) return true rescue ZMQException return false end def read_inbound_data(buffer) return unless has_more? ZMsg.recv_msg(@socket) end def write_outbound_data until @outbound_queue.empty? data = @outbound_queue.first if send_msg(data) @outbound_queue.shift else break end end return true end def has_more? @socket.events & ZMQ::Poller::POLLIN == ZMQ::Poller::POLLIN end def can_send? @socket.events & ZMQ::Poller::POLLOUT == ZMQ::Poller::POLLOUT end def schedule_close(after_writing) true end # TODO: fix me def peer_name sock = @socket.socket [sock.port, sock.inet_address.host_address] end def sock_name sock = @socket.socket [sock.local_port, sock.local_address.host_address] end def current_events SelectionKey::OP_READ end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
zmachine-0.1.1 | lib/zmachine/zmq_channel.rb |
zmachine-0.1.0 | lib/zmachine/zmq_channel.rb |