lib/amqp/client.rb in amqp-0.7.0 vs lib/amqp/client.rb in amqp-0.7.1

- old
+ new

@@ -1,62 +1,12 @@ # encoding: utf-8 -require File.expand_path('../frame', __FILE__) +require "amqp/basic_client" require 'uri' module AMQP - class Error < StandardError; end - - module BasicClient - attr_reader :broker - - def process_frame(frame) - if mq = channels[frame.channel] - mq.process_frame(frame) - return - end - - case frame - when Frame::Method - case method = frame.payload - when Protocol::Connection::Start - @broker = method - send Protocol::Connection::StartOk.new({:platform => 'Ruby/EventMachine', - :product => 'AMQP', - :information => 'http://github.com/ruby-amqp/amqp', - :version => VERSION}, - 'AMQPLAIN', - {:LOGIN => @settings[:user], - :PASSWORD => @settings[:pass]}, - 'en_US') - - when Protocol::Connection::Tune - send Protocol::Connection::TuneOk.new(:channel_max => 0, - :frame_max => 131072, - :heartbeat => 0) - - send Protocol::Connection::Open.new(:virtual_host => @settings[:vhost], - :capabilities => '', - :insist => @settings[:insist]) - - @on_disconnect = method(:disconnected) - - when Protocol::Connection::OpenOk - succeed(self) - - when Protocol::Connection::Close - # raise Error, "#{method.reply_text} in #{Protocol.classes[method.class_id].methods[method.method_id]}" - STDERR.puts "#{method.reply_text} in #{Protocol.classes[method.class_id].methods[method.method_id]}" - - when Protocol::Connection::CloseOk - @on_disconnect.call if @on_disconnect - end - end - end - end - def self.client @client ||= BasicClient end def self.client= mod @@ -76,37 +26,67 @@ @on_disconnect ||= proc { raise Error, "Could not connect to server #{opts[:host]}:#{opts[:port]}" } timeout @settings[:timeout] if @settings[:timeout] errback { @on_disconnect.call } unless @reconnecting - @connected = false + # TCP connection "openness" + @tcp_connection_established = false + # AMQP connection "openness" + @connected = false end def connection_completed - start_tls if @settings[:ssl] + if @settings[:ssl].is_a? Hash + start_tls @settings[:ssl] + elsif @settings[:ssl] + start_tls + end + log 'connected' # @on_disconnect = proc { raise Error, 'Disconnected from server' } unless @closing @reconnecting = false end - @connected = true - @connection_status.call(:connected) if @connection_status + @tcp_connection_established = true @buf = Buffer.new send_data HEADER send_data [1, 1, VERSION_MAJOR, VERSION_MINOR].pack('C4') + + if heartbeat = @settings[:heartbeat] + init_heartbeat if (@settings[:heartbeat] = heartbeat.to_i) > 0 + end end + def init_heartbeat + @last_server_heartbeat = Time.now + + @timer ||= EM::PeriodicTimer.new(@settings[:heartbeat]) do + if connected? + if @last_server_heartbeat < (Time.now - (@settings[:heartbeat] * 2)) + log "Reconnecting due to missing server heartbeats" + reconnect(true) + else + send AMQP::Frame::Heartbeat.new + end + end + end + end + + def tcp_connection_established? + @tcp_connection_established + end # tcp_connection_established? + def connected? @connected end def unbind log 'disconnected' @connected = false - EM.next_tick { @on_disconnect.call } + EM.next_tick { @on_disconnect.call; @tcp_connection_established = false } end def add_channel(mq) @_channel_mutex.synchronize do channels[ key = (channels.keys.max || 0) + 1 ] = mq @@ -196,51 +176,64 @@ log 'reconnecting' EM.reconnect @settings[:host], @settings[:port], self end - def self.connect amqp_url_or_opts = nil - if amqp_url_or_opts.is_a?(String) - opts = parse_amqp_url(amqp_url_or_opts) - elsif amqp_url_or_opts.is_a?(Hash) - opts = amqp_url_or_opts - elsif amqp_url_or_opts.nil? - opts = Hash.new + def self.connect(arg = nil) + opts = case arg + when String then + opts = parse_connection_uri(arg) + when Hash then + arg + else + Hash.new + end + + options = AMQP.settings.merge(opts) + + if options[:username] + options[:user] = options.delete(:username) end - opts = AMQP.settings.merge(opts) - EM.connect opts[:host], opts[:port], self, opts + if options[:password] + options[:pass] = options.delete(:password) + end + + EM.connect options[:host], options[:port], self, options end def connection_status(&blk) @connection_status = blk end private + def self.parse_connection_uri(connection_string) + uri = URI.parse(connection_string) + raise("amqp:// uri required!") unless %w{amqp amqps}.include?(uri.scheme) + + opts = {} + + opts[:user] = URI.unescape(uri.user) if uri.user + opts[:pass] = URI.unescape(uri.password) if uri.password + opts[:vhost] = URI.unescape(uri.path) if uri.path + opts[:host] = uri.host if uri.host + opts[:port] = uri.port || Hash["amqp" => 5672, "amqps" => 5671][uri.scheme] + opts[:ssl] = uri.scheme == "amqps" + + opts + end + + def disconnected @connection_status.call(:disconnected) if @connection_status reconnect end def log(*args) return unless @settings[:logging] or AMQP.logging require 'pp' pp args puts - end - - def self.parse_amqp_url(amqp_url) - uri = URI.parse(amqp_url) - raise("amqp:// uri required!") unless %w{amqp amqps}.include? uri.scheme - opts = {} - opts[:user] = URI.unescape(uri.user) if uri.user - opts[:pass] = URI.unescape(uri.password) if uri.password - opts[:vhost] = URI.unescape(uri.path) if uri.path - opts[:host] = uri.host if uri.host - opts[:port] = uri.port ? uri.port : - {"amqp" => 5672, "amqps" => 5671}[uri.scheme] - opts[:ssl] = uri.scheme == "amqps" - return opts end end end