lib/amqp/client.rb in amqp-0.6.0 vs lib/amqp/client.rb in amqp-0.6.4

- old
+ new

@@ -7,11 +7,11 @@ def process_frame frame if mq = channels[frame.channel] mq.process_frame(frame) return end - + case frame when Frame::Method case method = frame.payload when Protocol::Connection::Start send Protocol::Connection::StartOk.new({:platform => 'Ruby/EventMachine', @@ -47,11 +47,11 @@ end def self.client @client ||= BasicClient end - + def self.client= mod mod.__send__ :include, AMQP @client = mod end @@ -63,27 +63,39 @@ extend AMQP.client @on_disconnect ||= proc{ raise Error, "Could not connect to server #{opts[:host]}:#{opts[:port]}" } timeout @settings[:timeout] if @settings[:timeout] - errback{ @on_disconnect.call } + errback{ @on_disconnect.call } unless @reconnecting + + @connected = false end def connection_completed + start_tls if @settings[:ssl] log 'connected' # @on_disconnect = proc{ raise Error, 'Disconnected from server' } unless @closing - @on_disconnect = method(:reconnect) + @on_disconnect = method(:disconnected) @reconnecting = false end + + @connected = true + @connection_status.call(:connected) if @connection_status + @buf = Buffer.new send_data HEADER send_data [1, 1, VERSION_MAJOR, VERSION_MINOR].pack('C4') end + def connected? + @connected + end + def unbind log 'disconnected' + @connected = false EM.next_tick{ @on_disconnect.call } end def add_channel mq (@_channel_mutex ||= Mutex.new).synchronize do @@ -93,11 +105,11 @@ end def channels @channels ||= {} end - + def receive_data data # log 'receive_data', data @buf << data while frame = Frame.parse(@buf) @@ -108,11 +120,11 @@ def process_frame frame # this is a stub meant to be # replaced by the module passed into initialize end - + def send data, opts = {} channel = opts[:channel] ||= 0 data = data.to_frame(channel) unless data.is_a? Frame data.channel = channel @@ -156,34 +168,43 @@ EM.add_timer(1){ reconnect(true) } return end unless @reconnecting + @reconnecting = true + @deferred_status = nil initialize(@settings) mqs = @channels @channels = {} mqs.each{ |_,mq| mq.reset } if mqs - - @reconnecting = true end log 'reconnecting' EM.reconnect @settings[:host], @settings[:port], self end def self.connect opts = {} opts = AMQP.settings.merge(opts) EM.connect opts[:host], opts[:port], self, opts end - + + def connection_status &blk + @connection_status = blk + end + private - + + def disconnected + @connection_status.call(:disconnected) if @connection_status + reconnect + end + def log *args return unless @settings[:logging] or AMQP.logging require 'pp' pp args puts end end -end \ No newline at end of file +end