lib/zmachine/connection_manager.rb in zmachine-0.3.2 vs lib/zmachine/connection_manager.rb in zmachine-0.4.0
- old
+ new
@@ -13,21 +13,21 @@
ZMachine.logger.debug("zmachine:connection_manager:#{__method__}") if ZMachine.debug
@selector = selector
@connections = Set.new
@zmq_connections = Set.new
@new_connections = Set.new
- @unbound_connections = Set.new
+ @closing_connections = []
end
def idle?
@new_connections.size == 0 and
@zmq_connections.none? {|c| c.channel.can_recv? } # see comment in #process
end
def shutdown
ZMachine.logger.debug("zmachine:connection_manager:#{__method__}") if ZMachine.debug
- @unbound_connections += @connections
+ @closing_connections += @connections.to_a
cleanup
end
def bind(address, port_or_type, handler, *args, &block)
ZMachine.logger.debug("zmachine:connection_manager:#{__method__}", address: address, port_or_type: port_or_type) if ZMachine.debug
@@ -71,16 +71,16 @@
def process_connection(connection)
new_connection = connection.process_events
@new_connections << new_connection if new_connection
rescue IOException => e
- close_connection(connection, e)
+ close_connection(connection, false, e)
end
- def close_connection(connection, reason = nil)
- ZMachine.logger.debug("zmachine:connection_manager:#{__method__}", connection: connection, reason: reason.inspect) if ZMachine.debug
- @unbound_connections << [connection, reason]
+ def close_connection(connection, after_writing = false, reason = nil)
+ ZMachine.logger.debug("zmachine:connection_manager:#{__method__}", connection: connection, after_writing: after_writing, reason: reason.inspect) if ZMachine.debug
+ @closing_connections << [connection, after_writing, reason]
end
def add_new_connections
@new_connections.each do |connection|
ZMachine.logger.debug("zmachine:connection_manager:#{__method__}", connection: connection) if ZMachine.debug
@@ -90,39 +90,48 @@
if connection.channel.is_a?(ZMQChannel)
@zmq_connections << connection
connection.connection_completed
end
rescue ClosedChannelException => e
- @unbound_connections << [connection, e]
+ @closing_connections << [connection, false, e]
end
end
@new_connections.clear
end
def is_connected?(connection)
@connections.include?(connection)
end
def cleanup
- return if @unbound_connections.empty?
+ return if @closing_connections.empty?
ZMachine.logger.debug("zmachine:connection_manager:#{__method__}") if ZMachine.debug
- @unbound_connections.each do |connection|
- reason = nil
- connection, reason = *connection if connection.is_a?(Array)
- begin
- @connections.delete(connection)
- @zmq_connections.delete(connection)
- if connection.method(:unbind).arity != 0
- connection.unbind(reason)
- else
- connection.unbind
- end
- connection.channel.close!
- rescue Exception => e
- ZMachine.logger.exception(e, "failed to unbind connection") if ZMachine.debug
- end
+ closing_connections = @closing_connections
+ @closing_connections = []
+ closing_connections.each do |connection|
+ unbind_connection(connection)
end
- @unbound_connections.clear
+ end
+
+ def unbind_connection(connection)
+ after_writing = false
+ reason = nil
+ connection, after_writing, reason = *connection if connection.is_a?(Array)
+ if connection.method(:unbind).arity != 0
+ connection.unbind(reason)
+ else
+ connection.unbind
+ end
+ ZMachine.logger.debug("zmachine:connection_manager:#{__method__}", connection: connection, after_writing: after_writing, can_send: connection.can_send?) if ZMachine.debug
+ if after_writing && connection.can_send?
+ ZMachine.close_connection(connection, true)
+ else
+ connection.close!
+ @connections.delete(connection)
+ @zmq_connections.delete(connection)
+ end
+ rescue Exception => e
+ ZMachine.logger.exception(e, "failed to unbind connection") if ZMachine.debug
end
private
def build_connection(handler, *args)