lib/amqp/client.rb in amqp-0.5.5 vs lib/amqp/client.rb in amqp-0.5.9
- old
+ new
@@ -58,25 +58,38 @@
include EM::Deferrable
def initialize opts = {}
@settings = opts
extend AMQP.client
+
+ @on_disconnect = proc{ raise Error, "Could not connect to server #{opts[:host]}:#{opts[:port]}" }
+
+ timeout @settings[:timeout] if @settings[:timeout]
+ errback{ @on_disconnect.call }
end
def connection_completed
log 'connected'
+ @on_disconnect = proc{ raise Error, 'Disconnected from server' }
@buf = Buffer.new
send_data HEADER
send_data [1, 1, VERSION_MAJOR, VERSION_MINOR].pack('C4')
end
+ def unbind
+ log 'disconnected'
+ @on_disconnect.call unless $!
+ end
+
def add_channel mq
- channels[ key = (channels.keys.max || 0) + 1 ] = mq
- key
+ (@_channel_mutex ||= Mutex.new).synchronize do
+ channels[ key = (channels.keys.max || 0) + 1 ] = mq
+ key
+ end
end
- def channels mq = nil
+ def channels
@channels ||= {}
end
def receive_data data
# log 'receive_data', data
@@ -109,31 +122,24 @@
def close &on_disconnect
@on_disconnect = on_disconnect if on_disconnect
callback{ |c|
- if c.channels.keys.any?
- c.channels.each do |_, mq|
+ if c.channels.any?
+ c.channels.each do |ch, mq|
mq.close
end
else
send Protocol::Connection::Close.new(:reply_code => 200,
:reply_text => 'Goodbye',
:class_id => 0,
:method_id => 0)
end
}
end
-
- def unbind
- log 'disconnected'
- end
def self.connect opts = {}
opts = AMQP.settings.merge(opts)
- opts[:host] ||= '127.0.0.1'
- opts[:port] ||= PORT
-
EM.connect opts[:host], opts[:port], self, opts
end
private
\ No newline at end of file