lib/amqp/client.rb in amqp-0.7.0 vs lib/amqp/client.rb in amqp-0.7.1
- old
+ new
@@ -1,62 +1,12 @@
# encoding: utf-8
-require File.expand_path('../frame', __FILE__)
+require "amqp/basic_client"
require 'uri'
module AMQP
- class Error < StandardError; end
-
- module BasicClient
- attr_reader :broker
-
- def process_frame(frame)
- if mq = channels[frame.channel]
- mq.process_frame(frame)
- return
- end
-
- case frame
- when Frame::Method
- case method = frame.payload
- when Protocol::Connection::Start
- @broker = method
- send Protocol::Connection::StartOk.new({:platform => 'Ruby/EventMachine',
- :product => 'AMQP',
- :information => 'http://github.com/ruby-amqp/amqp',
- :version => VERSION},
- 'AMQPLAIN',
- {:LOGIN => @settings[:user],
- :PASSWORD => @settings[:pass]},
- 'en_US')
-
- when Protocol::Connection::Tune
- send Protocol::Connection::TuneOk.new(:channel_max => 0,
- :frame_max => 131072,
- :heartbeat => 0)
-
- send Protocol::Connection::Open.new(:virtual_host => @settings[:vhost],
- :capabilities => '',
- :insist => @settings[:insist])
-
- @on_disconnect = method(:disconnected)
-
- when Protocol::Connection::OpenOk
- succeed(self)
-
- when Protocol::Connection::Close
- # raise Error, "#{method.reply_text} in #{Protocol.classes[method.class_id].methods[method.method_id]}"
- STDERR.puts "#{method.reply_text} in #{Protocol.classes[method.class_id].methods[method.method_id]}"
-
- when Protocol::Connection::CloseOk
- @on_disconnect.call if @on_disconnect
- end
- end
- end
- end
-
def self.client
@client ||= BasicClient
end
def self.client= mod
@@ -76,37 +26,67 @@
@on_disconnect ||= proc { raise Error, "Could not connect to server #{opts[:host]}:#{opts[:port]}" }
timeout @settings[:timeout] if @settings[:timeout]
errback { @on_disconnect.call } unless @reconnecting
- @connected = false
+ # TCP connection "openness"
+ @tcp_connection_established = false
+ # AMQP connection "openness"
+ @connected = false
end
def connection_completed
- start_tls if @settings[:ssl]
+ if @settings[:ssl].is_a? Hash
+ start_tls @settings[:ssl]
+ elsif @settings[:ssl]
+ start_tls
+ end
+
log 'connected'
# @on_disconnect = proc { raise Error, 'Disconnected from server' }
unless @closing
@reconnecting = false
end
- @connected = true
- @connection_status.call(:connected) if @connection_status
+ @tcp_connection_established = true
@buf = Buffer.new
send_data HEADER
send_data [1, 1, VERSION_MAJOR, VERSION_MINOR].pack('C4')
+
+ if heartbeat = @settings[:heartbeat]
+ init_heartbeat if (@settings[:heartbeat] = heartbeat.to_i) > 0
+ end
end
+ def init_heartbeat
+ @last_server_heartbeat = Time.now
+
+ @timer ||= EM::PeriodicTimer.new(@settings[:heartbeat]) do
+ if connected?
+ if @last_server_heartbeat < (Time.now - (@settings[:heartbeat] * 2))
+ log "Reconnecting due to missing server heartbeats"
+ reconnect(true)
+ else
+ send AMQP::Frame::Heartbeat.new
+ end
+ end
+ end
+ end
+
+ def tcp_connection_established?
+ @tcp_connection_established
+ end # tcp_connection_established?
+
def connected?
@connected
end
def unbind
log 'disconnected'
@connected = false
- EM.next_tick { @on_disconnect.call }
+ EM.next_tick { @on_disconnect.call; @tcp_connection_established = false }
end
def add_channel(mq)
@_channel_mutex.synchronize do
channels[ key = (channels.keys.max || 0) + 1 ] = mq
@@ -196,51 +176,64 @@
log 'reconnecting'
EM.reconnect @settings[:host], @settings[:port], self
end
- def self.connect amqp_url_or_opts = nil
- if amqp_url_or_opts.is_a?(String)
- opts = parse_amqp_url(amqp_url_or_opts)
- elsif amqp_url_or_opts.is_a?(Hash)
- opts = amqp_url_or_opts
- elsif amqp_url_or_opts.nil?
- opts = Hash.new
+ def self.connect(arg = nil)
+ opts = case arg
+ when String then
+ opts = parse_connection_uri(arg)
+ when Hash then
+ arg
+ else
+ Hash.new
+ end
+
+ options = AMQP.settings.merge(opts)
+
+ if options[:username]
+ options[:user] = options.delete(:username)
end
- opts = AMQP.settings.merge(opts)
- EM.connect opts[:host], opts[:port], self, opts
+ if options[:password]
+ options[:pass] = options.delete(:password)
+ end
+
+ EM.connect options[:host], options[:port], self, options
end
def connection_status(&blk)
@connection_status = blk
end
private
+ def self.parse_connection_uri(connection_string)
+ uri = URI.parse(connection_string)
+ raise("amqp:// uri required!") unless %w{amqp amqps}.include?(uri.scheme)
+
+ opts = {}
+
+ opts[:user] = URI.unescape(uri.user) if uri.user
+ opts[:pass] = URI.unescape(uri.password) if uri.password
+ opts[:vhost] = URI.unescape(uri.path) if uri.path
+ opts[:host] = uri.host if uri.host
+ opts[:port] = uri.port || Hash["amqp" => 5672, "amqps" => 5671][uri.scheme]
+ opts[:ssl] = uri.scheme == "amqps"
+
+ opts
+ end
+
+
def disconnected
@connection_status.call(:disconnected) if @connection_status
reconnect
end
def log(*args)
return unless @settings[:logging] or AMQP.logging
require 'pp'
pp args
puts
- end
-
- def self.parse_amqp_url(amqp_url)
- uri = URI.parse(amqp_url)
- raise("amqp:// uri required!") unless %w{amqp amqps}.include? uri.scheme
- opts = {}
- opts[:user] = URI.unescape(uri.user) if uri.user
- opts[:pass] = URI.unescape(uri.password) if uri.password
- opts[:vhost] = URI.unescape(uri.path) if uri.path
- opts[:host] = uri.host if uri.host
- opts[:port] = uri.port ? uri.port :
- {"amqp" => 5672, "amqps" => 5671}[uri.scheme]
- opts[:ssl] = uri.scheme == "amqps"
- return opts
end
end
end