lib/amqp/client.rb in amqp-0.5.2 vs lib/amqp/client.rb in amqp-0.5.3
- old
+ new
@@ -1,9 +1,10 @@
require 'amqp/frame'
-require 'pp'
module AMQP
+ class Error < Exception; end
+
module BasicClient
def process_frame frame
if mq = channels[frame.channel]
mq.process_frame(frame)
return
@@ -14,27 +15,33 @@
case method = frame.payload
when Protocol::Connection::Start
send Protocol::Connection::StartOk.new({:platform => 'Ruby/EventMachine',
:product => 'AMQP',
:information => 'http://github.com/tmm1/amqp',
- :version => '0.5.0'},
+ :version => VERSION},
'AMQPLAIN',
- {:LOGIN => 'guest',
- :PASSWORD => 'guest'},
+ {:LOGIN => @settings[:user],
+ :PASSWORD => @settings[:pass]},
'en_US')
when Protocol::Connection::Tune
send Protocol::Connection::TuneOk.new(:channel_max => 0,
:frame_max => 131072,
:heartbeat => 0)
- send Protocol::Connection::Open.new(:virtual_host => '/',
+ send Protocol::Connection::Open.new(:virtual_host => @settings[:vhost],
:capabilities => '',
:insist => false)
when Protocol::Connection::OpenOk
@dfr.succeed(self)
+
+ when Protocol::Connection::Close
+ raise Error, "#{method.reply_text} in #{Protocol.classes[method.class_id].methods[method.method_id]}"
+
+ when Protocol::Connection::CloseOk
+ AMQP.stopped
end
end
end
end
@@ -46,12 +53,13 @@
mod.__send__ :include, AMQP
@client = mod
end
module Client
- def initialize dfr
+ def initialize dfr, opts = {}
@dfr = dfr
+ @settings = opts
extend AMQP.client
end
def connection_completed
log 'connected'
@@ -68,12 +76,12 @@
def channels mq = nil
@channels ||= {}
end
def receive_data data
+ # log 'receive_data', data
@buf << data
- log 'receive_data', data
while frame = Frame.parse(@buf)
log 'receive', frame
process_frame frame
end
@@ -90,44 +98,45 @@
data.channel = channel
log 'send', data
send_data data.to_s
end
- def send_data data
- log 'send_data', data
- super
+ # def send_data data
+ # log 'send_data', data
+ # super
+ # end
+
+ def close
+ send Protocol::Connection::Close.new(:reply_code => 200,
+ :reply_text => 'Goodbye',
+ :class_id => 0,
+ :method_id => 0)
end
def unbind
log 'disconnected'
end
def self.connect opts = {}
+ opts = AMQP.settings.merge(opts)
opts[:host] ||= 'localhost'
opts[:port] ||= PORT
dfr = EM::DefaultDeferrable.new
EM.run{
- EM.connect opts[:host], opts[:port], self, dfr
+ EM.connect opts[:host], opts[:port], self, dfr, opts
}
dfr
end
private
def log *args
return unless AMQP.logging
+ require 'pp'
pp args
puts
end
end
-
- def self.start *args
- @conn ||= Client.connect *args
- end
-end
-
-if $0 == __FILE__
- AMQP.start
end
\ No newline at end of file