lib/zmachine/connection.rb in zmachine-0.2.1 vs lib/zmachine/connection.rb in zmachine-0.3.0

- old
+ new

@@ -1,71 +1,75 @@ java_import java.io.IOException java_import java.nio.ByteBuffer java_import java.nio.channels.SelectionKey +require 'zmachine' + module ZMachine class Connection extend Forwardable attr_accessor :channel - attr_accessor :args - attr_accessor :block - def self.new(*args, &block) + def self.new(*args) allocate.instance_eval do - initialize(*args, &block) - @args, @block = args, block + initialize(*args) + @args = args post_init self end end # channel type dispatch - def bind(address, port_or_type) + def bind(address, port_or_type, &block) ZMachine.logger.debug("zmachine:connection:#{__method__}", connection: self) if ZMachine.debug - if address =~ %r{\w+://} - @channel = ZMQChannel.new(port_or_type) - @channel.bind(address) - else - @channel = TCPChannel.new - @channel.bind(address, port_or_type) - end + klass = (address =~ %r{\w+://}) ? ZMQChannel : TCPChannel + @channel = klass.new + @channel.bind(address, port_or_type) + @block = block self end - def connect(address, port_or_type) + def connect(address, port_or_type, &block) ZMachine.logger.debug("zmachine:connection:#{__method__}", connection: self) if ZMachine.debug - if address.nil? or address =~ %r{\w+://} - @channel = ZMQChannel.new(port_or_type) - @channel.connect(address) if address - else - @channel = TCPChannel.new - @channel.connect(address, port_or_type) - end - if @connect_timeout - @timer = ZMachine.add_timer(@connect_timeout) do - ZMachine.reactor.close_connection(self) - end - end + klass = (address.nil? || address =~ %r{\w+://}) ? ZMQChannel : TCPChannel + @channel = klass.new + @channel.connect(address, port_or_type) if address + yield self if block_given? + renew_timer self end + # callbacks + def connection_accepted + end + + def connection_completed + end + + def post_init + end + + def receive_data(data) + end + + def unbind + end + + # EventMachine Connection API + def_delegator :@channel, :bound? def_delegator :@channel, :closed? def_delegator :@channel, :connected? def_delegator :@channel, :connection_pending? - # EventMachine Connection API - - def _not_implemented - raise RuntimeError.new("API call not implemented! #{caller[0]}") - end - def close_connection(after_writing = false) - @channel.close(after_writing) + @channel.close(after_writing) do + ZMachine.close_connection(self) + end end alias :close :close_connection def close_connection_after_writing @@ -82,164 +86,55 @@ @inactivity_timeout = value end alias :set_comm_inactivity_timeout :comm_inactivity_timeout= - def connection_accepted(channel) - end - - def connection_completed - end - - def detach - _not_implemented - end - - def error? - _not_implemented - end - def get_idle_time (System.nano_time - @last_activity) / 1_000_000 end - def get_peer_cert - _not_implemented - end - def get_peername if peer = @channel.peer ::Socket.pack_sockaddr_in(*peer) end end - def get_pid - _not_implemented - end - - def get_proxied_bytes - _not_implemented - end - - def get_sock_opt(level, option) - _not_implemented - end - - def get_sockname - _not_implemented - end - - def get_status - _not_implemented - end - - def notify_readable=(mode) - _not_implemented - end - def notify_readable? true end - def notify_writable=(mode) - _not_implemented - end - def notify_writable? @channel.can_send? end - def pause - _not_implemented - end - - def paused? - _not_implemented - end - def pending_connect_timeout=(value) @connect_timeout = value end alias :set_pending_connect_timeout :pending_connect_timeout= - def post_init - end - - def proxy_completed - _not_implemented - end - - def proxy_incoming_to(conn, bufsize = 0) - _not_implemented - end - - def proxy_target_unbound - _not_implemented - end - - def receive_data(data) - end - def reconnect(server, port_or_type) ZMachine.reconnect(server, port_or_type, self) end - def resume - _not_implemented - end - def send_data(data) ZMachine.logger.debug("zmachine:connection:#{__method__}", connection: self) if ZMachine.debug + data = data.to_java_bytes if data.is_a?(String) # EM compat @channel.send_data(data) update_events end - def send_datagram(data, recipient_address, recipient_port) - _not_implemented - end - - def send_file_data(filename) - _not_implemented - end - - def set_sock_opt(level, optname, optval) - _not_implemented - end - - def ssl_handshake_completed - _not_implemented - end - - def ssl_verify_peer(cert) - _not_implemented - end - - def start_tls(args = {}) - _not_implemented - end - - def stop_proxying - _not_implemented - end - - def stream_file_data(filename, args = {}) - _not_implemented - end - - def unbind - end - # triggers def acceptable! client = @channel.accept - connection_accepted(client) if client.connected? ZMachine.logger.debug("zmachine:connection:#{__method__}", connection: self, client: client) if ZMachine.debug - self.class.new(*@args, &@block).tap do |instance| - instance.channel = client - end + connection = self.class.new(*@args) + connection.channel = client + @block.call(connection) if @block + connection.connection_accepted + connection end def connectable! ZMachine.logger.debug("zmachine:connection:#{__method__}", connection: self) if ZMachine.debug @channel.finish_connecting @@ -271,12 +166,17 @@ def register(selector) ZMachine.logger.debug("zmachine:connection:#{__method__}", connection: self, fd: @channel.selectable_fd) if ZMachine.debug @channel_key ||= @channel.selectable_fd.register(selector, current_events, self) end + def valid? + @channel_key && + @channel_key.valid? + end + def update_events - return unless @channel_key + return unless valid? ZMachine.logger.debug("zmachine:connection:#{__method__}", connection: self) if ZMachine.debug @channel_key.interest_ops(current_events) end def current_events @@ -299,30 +199,35 @@ return events end def process_events - return unless @channel_key + return unless valid? ZMachine.logger.debug("zmachine:connection:#{__method__}", connection: self) if ZMachine.debug if @channel_key.connectable? connectable! elsif @channel_key.acceptable? acceptable! else writable! if @channel_key.writable? readable! if @channel_key.readable? end + rescue Java::JavaNioChannels::CancelledKeyException + # channel may have been closed by write handler. ignore exception and + # wait for cleanup end def mark_active! @last_activity = System.nano_time renew_timer if @inactivity_timeout end def renew_timer @timer.cancel if @timer - @timer = ZMachine.add_timer(@inactivity_timeout) do - ZMachine.reactor.close_connection(self) + if connection_pending? && @connect_timeout + @timer = ZMachine.add_timer(@connect_timeout) { ZMachine.close_connection(self, Errno::ETIMEDOUT) } + elsif @inactivity_timeout + @timer = ZMachine.add_timer(@inactivity_timeout) { ZMachine.close_connection(self, Errno::ETIMEDOUT) } end end end end