lib/amqp/client.rb in amqp-0.5.5 vs lib/amqp/client.rb in amqp-0.5.9

- old
+ new

@@ -58,25 +58,38 @@ include EM::Deferrable def initialize opts = {} @settings = opts extend AMQP.client + + @on_disconnect = proc{ raise Error, "Could not connect to server #{opts[:host]}:#{opts[:port]}" } + + timeout @settings[:timeout] if @settings[:timeout] + errback{ @on_disconnect.call } end def connection_completed log 'connected' + @on_disconnect = proc{ raise Error, 'Disconnected from server' } @buf = Buffer.new send_data HEADER send_data [1, 1, VERSION_MAJOR, VERSION_MINOR].pack('C4') end + def unbind + log 'disconnected' + @on_disconnect.call unless $! + end + def add_channel mq - channels[ key = (channels.keys.max || 0) + 1 ] = mq - key + (@_channel_mutex ||= Mutex.new).synchronize do + channels[ key = (channels.keys.max || 0) + 1 ] = mq + key + end end - def channels mq = nil + def channels @channels ||= {} end def receive_data data # log 'receive_data', data @@ -109,31 +122,24 @@ def close &on_disconnect @on_disconnect = on_disconnect if on_disconnect callback{ |c| - if c.channels.keys.any? - c.channels.each do |_, mq| + if c.channels.any? + c.channels.each do |ch, mq| mq.close end else send Protocol::Connection::Close.new(:reply_code => 200, :reply_text => 'Goodbye', :class_id => 0, :method_id => 0) end } end - - def unbind - log 'disconnected' - end def self.connect opts = {} opts = AMQP.settings.merge(opts) - opts[:host] ||= '127.0.0.1' - opts[:port] ||= PORT - EM.connect opts[:host], opts[:port], self, opts end private \ No newline at end of file