lib/amqp/client.rb in amqp-0.6.0 vs lib/amqp/client.rb in amqp-0.6.4
- old
+ new
@@ -7,11 +7,11 @@
def process_frame frame
if mq = channels[frame.channel]
mq.process_frame(frame)
return
end
-
+
case frame
when Frame::Method
case method = frame.payload
when Protocol::Connection::Start
send Protocol::Connection::StartOk.new({:platform => 'Ruby/EventMachine',
@@ -47,11 +47,11 @@
end
def self.client
@client ||= BasicClient
end
-
+
def self.client= mod
mod.__send__ :include, AMQP
@client = mod
end
@@ -63,27 +63,39 @@
extend AMQP.client
@on_disconnect ||= proc{ raise Error, "Could not connect to server #{opts[:host]}:#{opts[:port]}" }
timeout @settings[:timeout] if @settings[:timeout]
- errback{ @on_disconnect.call }
+ errback{ @on_disconnect.call } unless @reconnecting
+
+ @connected = false
end
def connection_completed
+ start_tls if @settings[:ssl]
log 'connected'
# @on_disconnect = proc{ raise Error, 'Disconnected from server' }
unless @closing
- @on_disconnect = method(:reconnect)
+ @on_disconnect = method(:disconnected)
@reconnecting = false
end
+
+ @connected = true
+ @connection_status.call(:connected) if @connection_status
+
@buf = Buffer.new
send_data HEADER
send_data [1, 1, VERSION_MAJOR, VERSION_MINOR].pack('C4')
end
+ def connected?
+ @connected
+ end
+
def unbind
log 'disconnected'
+ @connected = false
EM.next_tick{ @on_disconnect.call }
end
def add_channel mq
(@_channel_mutex ||= Mutex.new).synchronize do
@@ -93,11 +105,11 @@
end
def channels
@channels ||= {}
end
-
+
def receive_data data
# log 'receive_data', data
@buf << data
while frame = Frame.parse(@buf)
@@ -108,11 +120,11 @@
def process_frame frame
# this is a stub meant to be
# replaced by the module passed into initialize
end
-
+
def send data, opts = {}
channel = opts[:channel] ||= 0
data = data.to_frame(channel) unless data.is_a? Frame
data.channel = channel
@@ -156,34 +168,43 @@
EM.add_timer(1){ reconnect(true) }
return
end
unless @reconnecting
+ @reconnecting = true
+
@deferred_status = nil
initialize(@settings)
mqs = @channels
@channels = {}
mqs.each{ |_,mq| mq.reset } if mqs
-
- @reconnecting = true
end
log 'reconnecting'
EM.reconnect @settings[:host], @settings[:port], self
end
def self.connect opts = {}
opts = AMQP.settings.merge(opts)
EM.connect opts[:host], opts[:port], self, opts
end
-
+
+ def connection_status &blk
+ @connection_status = blk
+ end
+
private
-
+
+ def disconnected
+ @connection_status.call(:disconnected) if @connection_status
+ reconnect
+ end
+
def log *args
return unless @settings[:logging] or AMQP.logging
require 'pp'
pp args
puts
end
end
-end
\ No newline at end of file
+end