lib/amqp/client.rb in amqp-0.5.3 vs lib/amqp/client.rb in amqp-0.5.5
- old
+ new
@@ -31,17 +31,17 @@
send Protocol::Connection::Open.new(:virtual_host => @settings[:vhost],
:capabilities => '',
:insist => false)
when Protocol::Connection::OpenOk
- @dfr.succeed(self)
+ succeed(self)
when Protocol::Connection::Close
raise Error, "#{method.reply_text} in #{Protocol.classes[method.class_id].methods[method.method_id]}"
when Protocol::Connection::CloseOk
- AMQP.stopped
+ @on_disconnect.call if @on_disconnect
end
end
end
end
@@ -53,12 +53,13 @@
mod.__send__ :include, AMQP
@client = mod
end
module Client
- def initialize dfr, opts = {}
- @dfr = dfr
+ include EM::Deferrable
+
+ def initialize opts = {}
@settings = opts
extend AMQP.client
end
def connection_completed
@@ -94,47 +95,52 @@
def send data, opts = {}
channel = opts[:channel] ||= 0
data = data.to_frame(channel) unless data.is_a? Frame
data.channel = channel
+
log 'send', data
send_data data.to_s
end
# def send_data data
# log 'send_data', data
# super
# end
- def close
- send Protocol::Connection::Close.new(:reply_code => 200,
- :reply_text => 'Goodbye',
- :class_id => 0,
- :method_id => 0)
+ def close &on_disconnect
+ @on_disconnect = on_disconnect if on_disconnect
+
+ callback{ |c|
+ if c.channels.keys.any?
+ c.channels.each do |_, mq|
+ mq.close
+ end
+ else
+ send Protocol::Connection::Close.new(:reply_code => 200,
+ :reply_text => 'Goodbye',
+ :class_id => 0,
+ :method_id => 0)
+ end
+ }
end
def unbind
log 'disconnected'
end
def self.connect opts = {}
opts = AMQP.settings.merge(opts)
- opts[:host] ||= 'localhost'
+ opts[:host] ||= '127.0.0.1'
opts[:port] ||= PORT
- dfr = EM::DefaultDeferrable.new
-
- EM.run{
- EM.connect opts[:host], opts[:port], self, dfr, opts
- }
-
- dfr
+ EM.connect opts[:host], opts[:port], self, opts
end
private
def log *args
- return unless AMQP.logging
+ return unless @settings[:logging] or AMQP.logging
require 'pp'
pp args
puts
end
end
\ No newline at end of file