lib/zmachine/connection_manager.rb in zmachine-0.3.2 vs lib/zmachine/connection_manager.rb in zmachine-0.4.0

- old
+ new

@@ -13,21 +13,21 @@ ZMachine.logger.debug("zmachine:connection_manager:#{__method__}") if ZMachine.debug @selector = selector @connections = Set.new @zmq_connections = Set.new @new_connections = Set.new - @unbound_connections = Set.new + @closing_connections = [] end def idle? @new_connections.size == 0 and @zmq_connections.none? {|c| c.channel.can_recv? } # see comment in #process end def shutdown ZMachine.logger.debug("zmachine:connection_manager:#{__method__}") if ZMachine.debug - @unbound_connections += @connections + @closing_connections += @connections.to_a cleanup end def bind(address, port_or_type, handler, *args, &block) ZMachine.logger.debug("zmachine:connection_manager:#{__method__}", address: address, port_or_type: port_or_type) if ZMachine.debug @@ -71,16 +71,16 @@ def process_connection(connection) new_connection = connection.process_events @new_connections << new_connection if new_connection rescue IOException => e - close_connection(connection, e) + close_connection(connection, false, e) end - def close_connection(connection, reason = nil) - ZMachine.logger.debug("zmachine:connection_manager:#{__method__}", connection: connection, reason: reason.inspect) if ZMachine.debug - @unbound_connections << [connection, reason] + def close_connection(connection, after_writing = false, reason = nil) + ZMachine.logger.debug("zmachine:connection_manager:#{__method__}", connection: connection, after_writing: after_writing, reason: reason.inspect) if ZMachine.debug + @closing_connections << [connection, after_writing, reason] end def add_new_connections @new_connections.each do |connection| ZMachine.logger.debug("zmachine:connection_manager:#{__method__}", connection: connection) if ZMachine.debug @@ -90,39 +90,48 @@ if connection.channel.is_a?(ZMQChannel) @zmq_connections << connection connection.connection_completed end rescue ClosedChannelException => e - @unbound_connections << [connection, e] + @closing_connections << [connection, false, e] end end @new_connections.clear end def is_connected?(connection) @connections.include?(connection) end def cleanup - return if @unbound_connections.empty? + return if @closing_connections.empty? ZMachine.logger.debug("zmachine:connection_manager:#{__method__}") if ZMachine.debug - @unbound_connections.each do |connection| - reason = nil - connection, reason = *connection if connection.is_a?(Array) - begin - @connections.delete(connection) - @zmq_connections.delete(connection) - if connection.method(:unbind).arity != 0 - connection.unbind(reason) - else - connection.unbind - end - connection.channel.close! - rescue Exception => e - ZMachine.logger.exception(e, "failed to unbind connection") if ZMachine.debug - end + closing_connections = @closing_connections + @closing_connections = [] + closing_connections.each do |connection| + unbind_connection(connection) end - @unbound_connections.clear + end + + def unbind_connection(connection) + after_writing = false + reason = nil + connection, after_writing, reason = *connection if connection.is_a?(Array) + if connection.method(:unbind).arity != 0 + connection.unbind(reason) + else + connection.unbind + end + ZMachine.logger.debug("zmachine:connection_manager:#{__method__}", connection: connection, after_writing: after_writing, can_send: connection.can_send?) if ZMachine.debug + if after_writing && connection.can_send? + ZMachine.close_connection(connection, true) + else + connection.close! + @connections.delete(connection) + @zmq_connections.delete(connection) + end + rescue Exception => e + ZMachine.logger.exception(e, "failed to unbind connection") if ZMachine.debug end private def build_connection(handler, *args)