# # Copyright (c) 2009-2012 RightScale Inc # # Permission is hereby granted, free of charge, to any person obtaining # a copy of this software and associated documentation files (the # "Software"), to deal in the Software without restriction, including # without limitation the rights to use, copy, modify, merge, publish, # distribute, sublicense, and/or sell copies of the Software, and to # permit persons to whom the Software is furnished to do so, subject to # the following conditions: # # The above copyright notice and this permission notice shall be # included in all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE # LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. module RightAMQP # Client for accessing AMQP broker class BrokerClient include RightSupport::Log::Mixin # Set of possible broker connection status values STATUS = [ :connecting, # Initiated AMQP connection but not yet confirmed that connected :connected, # Confirmed AMQP connection :stopping, # Broker is stopping service and, although still connected, is no longer usable :disconnected, # Notified by AMQP that connection has been lost and attempting to reconnect :closed, # AMQP connection closed explicitly or because of too many failed connect attempts :failed # Failed to connect due to internal failure or AMQP failure to connect ] # (AMQP::Channel) Channel of AMQP connection used by this client attr_reader :channel # (String) Broker identity attr_reader :identity # (String) Broker alias, used in logs attr_reader :alias # (String) Host name attr_reader :host # (Integer) Port number attr_reader :port # (Integer) Unique index for broker within given set, used in alias attr_reader :index # (Symbol) AMQP connection STATUS value attr_reader :status # (Array) List of MQ::Queue queues currently subscribed attr_reader :queues # (Boolean) Whether last connect attempt failed attr_reader :last_failed # (RightSupport::Stats::Activity) AMQP lost connection statistics attr_reader :disconnect_stats # (RightSupport::Stats::Activity) AMQP connection failure statistics attr_reader :failure_stats # (Integer) Number of attempts to connect after failure attr_reader :retries # Create broker client # # === Parameters # identity(String):: Broker identity # address(Hash):: Broker address # :host(String:: IP host name or address # :port(Integer):: TCP port number for individual broker # :index(String):: Unique index for broker within set of brokers for use in forming alias # serializer(Serializer):: Serializer used for unmarshaling received messages to packets # (responds to :load); if nil, has same effect as setting subscribe option :no_unserialize # exception_stats(RightSupport::Stats::Exceptions):: Exception statistics container to be updated # whenever there is an unexpected exception # non_delivery_stats(RightSupport::Stats::Activity):: Non-delivery statistics container to be # updated whenever a message cannot be sent or received # options(Hash):: Configuration options # :user(String):: User name # :pass(String):: Password # :vhost(String):: Virtual host path name # :ssl(Boolean):: Whether SSL is enabled # :insist(Boolean):: Whether to suppress redirection of connection # :reconnect_interval(Integer):: Number of seconds between reconnect attempts # :heartbeat(Integer):: Number of seconds between AMQP connection heartbeats used to keep # connection alive, e.g., when AMQP broker is behind a firewall # :prefetch(Integer):: Maximum number of messages the AMQP broker is to prefetch for the agent # before it receives an ack. Value 1 ensures that only last unacknowledged gets redelivered # if the agent crashes. Value 0 means unlimited prefetch. # :fiber_pool(NB::FiberPool):: Pool of initialized fibers to be used for asynchronous message # processing (can be overridden with subscribe option) # :exception_on_receive_callback(Proc):: Callback activated on a receive exception with parameters # message(Object):: Message received # exception(Exception):: Exception raised # :update_status_callback(Proc):: Callback activated on a connection status change with parameters # broker(BrokerClient):: Broker client # connected_before(Boolean):: Whether was connected prior to this status change # :return_message_callback(Proc):: Callback activated when a message is returned with parameters # to(String):: Queue to which message was published # reason(String):: Reason for return # "NO_ROUTE" - queue does not exist # "NO_CONSUMERS" - queue exists but it has no consumers, or if :immediate was specified, # all consumers are not immediately ready to consume # "ACCESS_REFUSED" - queue not usable because broker is in the process of stopping service # message(String):: Returned serialized message # # existing(BrokerClient|nil):: Existing broker client for this address, or nil if none # # === Raise # ArgumentError:: If serializer does not respond to :dump and :load def initialize(identity, address, serializer, exception_stats, non_delivery_stats, options, existing = nil) @options = options @identity = identity @host = address[:host] @port = address[:port].to_i @index = address[:index].to_i set_alias(@index) unless serializer.nil? || [:dump, :load].all? { |m| serializer.respond_to?(m) } raise ArgumentError, "serializer must be a class/object that responds to :dump and :load" end @serializer = serializer @queues = [] @last_failed = false @exception_stats = exception_stats @non_delivery_stats = non_delivery_stats @disconnect_stats = RightSupport::Stats::Activity.new(measure_rate = false) @failure_stats = RightSupport::Stats::Activity.new(measure_rate = false) @retries = 0 connect(address, @options[:reconnect_interval]) if existing @disconnect_stats = existing.disconnect_stats @failure_stats = existing.failure_stats @last_failed = existing.last_failed @retries = existing.retries update_failure if @status == :failed end end # Set alias for broker for use in logs # # === Parameters # index(Integer):: Unique index for broker within given set # # === Return # (String):: Broker alias def set_alias(index) @alias = "b#{index}" end # Determine whether the broker connection is usable, i.e., connecting or confirmed connected # # === Return # (Boolean):: true if usable, otherwise false def usable? [:connected, :connecting].include?(@status) end # Determine whether this client is currently connected to the broker # # === Return # (Boolean):: true if connected, otherwise false def connected? @status == :connected end # Determine whether the broker connection has failed # # === Return # (Boolean):: true if failed, otherwise false def failed?(backoff = false) @status == :failed end # Subscribe an AMQP queue to an AMQP exchange # Do not wait for confirmation from broker that subscription is complete # When a message is received, acknowledge, unserialize, and log it as specified # If the message is unserialized and it is not of the right type, it is dropped after logging an error # # === Parameters # queue(Hash):: AMQP queue being subscribed with keys :name and :options, # which are the standard AMQP ones plus # :no_declare(Boolean):: Whether to skip declaring this queue on the broker # to cause its creation; for use when caller does not have permission to create or # knows the queue already exists and wants to avoid declare overhead # exchange(Hash|nil):: AMQP exchange to subscribe to with keys :type, :name, and :options, # nil means use empty exchange by directly subscribing to queue; the :options are the # standard AMQP ones plus # :no_declare(Boolean):: Whether to skip declaring this exchange on the broker # to cause its creation; for use when caller does not have create permission or # knows the exchange already exists and wants to avoid declare overhead # options(Hash):: Subscribe options: # :ack(Boolean):: Whether caller takes responsibility for explicitly acknowledging each # message received, defaults to implicit acknowledgement in AMQP as part of message receipt # :no_unserialize(Boolean):: Do not unserialize message, this is an escape for special # situations like enrollment, also implicitly disables receive filtering and logging; # this option is implicitly invoked if initialize without a serializer # (packet class)(Array(Symbol)):: Filters to be applied in to_s when logging packet to :info, # only packet classes specified are accepted, others are not processed but are logged with error # :category(String):: Packet category description to be used in error messages # :log_data(String):: Additional data to display at end of log entry # :no_log(Boolean):: Disable receive logging unless debug level # :exchange2(Hash):: Additional exchange to which same queue is to be bound # :brokers(Array):: Identity of brokers for which to subscribe, defaults to all usable if nil or empty # :fiber_pool(NB::FiberPool):: Pool of initialized fibers to be used for asynchronous message # processing (non-nil value will override constructor option setting) # # === Block # Required block with following parameters to be called each time exchange matches a message to the queue # identity(String):: Serialized identity of broker delivering the message # message(Packet|String):: Message received, which is unserialized unless :no_unserialize was specified # header(AMQP::Protocol::Header):: Message header (optional block parameter) # # === Raise # ArgumentError:: If a block is not supplied # # === Return # (Boolean):: true if subscribe successfully or if already subscribed, otherwise false def subscribe(queue, exchange = nil, options = {}, &block) raise ArgumentError, "Must call this method with a block" unless block return false unless usable? return true unless @queues.select { |q| q.name == queue[:name] }.empty? to_exchange = if exchange if options[:exchange2] " to exchanges #{exchange[:name]} and #{options[:exchange2][:name]}" else " to exchange #{exchange[:name]}" end end queue_options = queue[:options] || {} exchange_options = (exchange && exchange[:options]) || {} begin logger.info("[setup] Subscribing queue #{queue[:name]}#{to_exchange} on broker #{@alias}") q = @channel.queue(queue[:name], queue_options) @queues << q if exchange x = @channel.__send__(exchange[:type], exchange[:name], exchange_options) binding = q.bind(x, options[:key] ? {:key => options[:key]} : {}) if exchange2 = options[:exchange2] q.bind(@channel.__send__(exchange2[:type], exchange2[:name], exchange2[:options] || {})) end q = binding end q.subscribe(options[:ack] ? {:ack => true} : {}) do |header, message| begin if pool = (options[:fiber_pool] || @options[:fiber_pool]) pool.spawn { receive(queue[:name], header, message, options, &block) } else receive(queue[:name], header, message, options, &block) end rescue StandardError => e header.ack if options[:ack] logger.exception("Failed setting up to receive message from queue #{queue.inspect} " + "on broker #{@alias}", e, :trace) @exception_stats.track("receive", e) @non_delivery_stats.update("receive failure") end end rescue StandardError => e logger.exception("Failed subscribing queue #{queue.inspect}#{to_exchange} on broker #{@alias}", e, :trace) @exception_stats.track("subscribe", e) false end end # Unsubscribe from the specified queues # Silently ignore unknown queues # # === Parameters # queue_names(Array):: Names of queues previously subscribed to # # === Block # Optional block to be called with no parameters when each unsubscribe completes # # === Return # true:: Always return true def unsubscribe(queue_names, &block) unless failed? @queues.reject! do |q| if queue_names.include?(q.name) begin logger.info("[stop] Unsubscribing queue #{q.name} on broker #{@alias}") q.unsubscribe { block.call if block } rescue StandardError => e logger.exception("Failed unsubscribing queue #{q.name} on broker #{@alias}", e, :trace) @exception_stats.track("unsubscribe", e) block.call if block end true else false end end end true end # Declare queue or exchange object but do not subscribe to it # # === Parameters # type(Symbol):: Type of object: :queue, :direct, :fanout or :topic # name(String):: Name of object # options(Hash):: Standard AMQP declare options # # === Return # (Boolean):: true if declare successfully, otherwise false def declare(type, name, options = {}) return false unless usable? begin logger.info("[setup] Declaring #{name} #{type.to_s} on broker #{@alias}") delete_amqp_resources(:queue, name) @channel.__send__(type, name, options) true rescue StandardError => e logger.exception("Failed declaring #{type.to_s} #{name} on broker #{@alias}", e, :trace) @exception_stats.track("declare", e) false end end # Check status of specified queues # Silently ignore unknown queues # If a queue whose status is being checked does not exist in the broker, # this broker connection will fail and become unusable # # === Parameters # queue_names(Array):: Names of queues previously subscribed to # # === Block # Optional block to be called each time that status for a queue is retrieved with # parameters queue name, message count, and consumer count; the counts are nil # if there was a failure while trying to retrieve them; the block is not called # for queues to which this client is not currently subscribed # # === Return # (Boolean):: true if connected, otherwise false, in which case block never gets called def queue_status(queue_names, &block) return false unless connected? @queues.each do |q| if queue_names.include?(q.name) begin q.status { |messages, consumers| block.call(q.name, messages, consumers) if block } rescue StandardError => e logger.exception("Failed checking status of queue #{q.name} on broker #{@alias}", e, :trace) @exception_stats.track("queue_status", e) block.call(q.name, nil, nil) if block end end end true end # Publish message to AMQP exchange # # === Parameters # exchange(Hash):: AMQP exchange to subscribe to with keys :type, :name, and :options, # which are the standard AMQP ones plus # :no_declare(Boolean):: Whether to skip declaring this exchange or queue on the broker # to cause its creation; for use when caller does not have create permission or # knows the object already exists and wants to avoid declare overhead # :declare(Boolean):: Whether to delete this exchange or queue from the AMQP cache # to force it to be declared on the broker and thus be created if it does not exist # packet(Packet):: Message to serialize and publish (must respond to :to_s(log_filter, # protocol_version) unless :no_serialize specified; if responds to :type, :from, :token, # and/or :one_way, these value are used if this message is returned as non-deliverable) # message(String):: Serialized message to be published # options(Hash):: Publish options -- standard AMQP ones plus # :no_serialize(Boolean):: Do not serialize packet because it is already serialized # :log_filter(Array(Symbol)):: Filters to be applied in to_s when logging packet to :info # :log_data(String):: Additional data to display at end of log entry # :no_log(Boolean):: Disable publish logging unless debug level # # === Return # (Boolean):: true if publish successfully, otherwise false def publish(exchange, packet, message, options = {}) return false unless connected? begin exchange_options = exchange[:options] || {} unless options[:no_serialize] log_data = "" unless options[:no_log] && logger.level != :debug re = "RE-" if packet.respond_to?(:tries) && !packet.tries.empty? log_filter = options[:log_filter] unless logger.level == :debug log_data = "#{re}SEND #{@alias} #{packet.to_s(log_filter, :send_version)}" if logger.level == :debug log_data += ", publish options #{options.inspect}, exchange #{exchange[:name]}, " + "type #{exchange[:type]}, options #{exchange[:options].inspect}" end log_data += ", #{options[:log_data]}" if options[:log_data] logger.info(log_data) unless log_data.empty? end end delete_amqp_resources(exchange[:type], exchange[:name]) if exchange_options[:declare] @channel.__send__(exchange[:type], exchange[:name], exchange_options).publish(message, options) true rescue StandardError => e logger.exception("Failed publishing to exchange #{exchange.inspect} on broker #{@alias}", e, :trace) @exception_stats.track("publish", e) @non_delivery_stats.update("publish failure") false end end # Delete queue # # === Parameters # name(String):: Queue name # options(Hash):: Queue declare options # # === Return # (Boolean):: true if queue was successfully deleted, otherwise false def delete(name, options = {}) deleted = false if usable? begin @queues.reject! do |q| if q.name == name @channel.queue(name, options.merge(:no_declare => true)).delete deleted = true end end unless deleted # Allowing declare to happen since queue may not exist and do not want NOT_FOUND # failure to cause AMQP channel to close @channel.queue(name, options).delete deleted = true end rescue StandardError => e logger.exception("Failed deleting queue #{name.inspect} on broker #{@alias}", e, :trace) @exception_stats.track("delete", e) end end deleted end # Delete resources from local AMQP cache # # === Parameters # type(Symbol):: Type of AMQP object # name(String):: Name of object # # === Return # true:: Always return true def delete_amqp_resources(type, name) @channel.__send__(type == :queue ? :queues : :exchanges).delete(name) true end # Close broker connection # # === Parameters # propagate(Boolean):: Whether to propagate connection status updates, defaults to true # normal(Boolean):: Whether this is a normal close vs. a failed connection, defaults to true # log(Boolean):: Whether to log that closing, defaults to true # # === Block # Optional block with no parameters to be called after connection closed # # === Return # true:: Always return true def close(propagate = true, normal = true, log = true, &block) final_status = normal ? :closed : :failed if ![:closed, :failed].include?(@status) begin logger.info("[stop] Closed connection to broker #{@alias}") if log update_status(final_status) if propagate @connection.close do @status = final_status yield if block_given? end rescue StandardError => e logger.exception("Failed to close broker #{@alias}", e, :trace) @exception_stats.track("close", e) @status = final_status yield if block_given? end else @status = final_status yield if block_given? end true end # Get broker client information summarizing its status # # === Return # (Hash):: Status of broker with keys # :identity(String):: Serialized identity # :alias(String):: Alias used in logs # :status(Symbol):: Status of connection # :disconnects(Integer):: Number of times lost connection # :failures(Integer):: Number of times connect failed # :retries(Integer):: Number of attempts to connect after failure def summary { :identity => @identity, :alias => @alias, :status => @status, :retries => @retries, :disconnects => @disconnect_stats.total, :failures => @failure_stats.total, } end # Get broker client statistics # # === Return # (Hash):: Broker client stats with keys # "alias"(String):: Broker alias # "identity"(String):: Broker identity # "status"(Status):: Status of connection # "disconnect last"(Hash|nil):: Last disconnect information with key "elapsed", or nil if none # "disconnects"(Integer|nil):: Number of times lost connection, or nil if none # "failure last"(Hash|nil):: Last connect failure information with key "elapsed", or nil if none # "failures"(Integer|nil):: Number of failed attempts to connect to broker, or nil if none # "retries"(Integer|nil):: Number of connect retries, or nil if none def stats { "alias" => @alias, "identity" => @identity, "status" => @status.to_s, "disconnect last" => @disconnect_stats.last, "disconnects" => RightSupport::Stats.nil_if_zero(@disconnect_stats.total), "failure last" => @failure_stats.last, "failures" => RightSupport::Stats.nil_if_zero(@failure_stats.total), "retries" => RightSupport::Stats.nil_if_zero(@retries) } end # Callback from AMQP with connection status or from HABrokerClient # Makes client callback with :connected or :disconnected status if boundary crossed # # === Parameters # status(Symbol):: Status of connection (:connected, :disconnected, :stopping, :failed, :closed) # # === Return # true:: Always return true def update_status(status) # Do not let closed connection regress to failed return true if status == :failed && @status == :closed # Wait until connection is ready (i.e. handshake with broker is completed) before # changing our status to connected return true if status == :connected status = :connected if status == :ready before = @status @status = status if status == :connected update_success elsif status == :failed update_failure elsif status == :disconnected && before != :disconnected @disconnect_stats.update end unless status == before || @options[:update_status_callback].nil? @options[:update_status_callback].call(self, before == :connected) end true end protected # Connect to broker and register for status updates # Also set prefetch value if specified and setup for message returns # # === Parameters # address(Hash):: Broker address # :host(String:: IP host name or address # :port(Integer):: TCP port number for individual broker # :index(String):: Unique index for broker within given set for use in forming alias # reconnect_interval(Integer):: Number of seconds between reconnect attempts # # === Return # true:: Always return true def connect(address, reconnect_interval) begin logger.info("[setup] Connecting to broker #{@identity}, alias #{@alias}") @status = :connecting @connection = AMQP.connect(:user => @options[:user], :pass => @options[:pass], :vhost => @options[:vhost], :host => address[:host], :port => address[:port], :ssl => @options[:ssl], :identity => @identity, :insist => @options[:insist] || false, :heartbeat => @options[:heartbeat], :reconnect_delay => lambda { rand(reconnect_interval) }, :reconnect_interval => reconnect_interval) @channel = MQ.new(@connection) @channel.__send__(:connection).connection_status { |status| update_status(status) } @channel.prefetch(@options[:prefetch]) if @options[:prefetch] @channel.return_message { |header, message| handle_return(header, message) } rescue StandardError => e @status = :failed @failure_stats.update logger.exception("Failed connecting to broker #{@alias}", e, :trace) @exception_stats.track("connect", e) @connection.close if @connection end end # Receive message by optionally unserializing it, passing it to the callback, and optionally # acknowledging it # # === Parameters # queue(String):: Name of queue # header(AMQP::Protocol::Header):: Message header # message(String):: Serialized packet # options(Hash):: Subscribe options # :ack(Boolean):: Whether caller takes responsibility for explicitly acknowledging each # message received, defaults to implicit acknowledgement in AMQP as part of message receipt # :no_unserialize(Boolean):: Do not unserialize message, this is an escape for special # situations like enrollment, also implicitly disables receive filtering and logging; # this option is implicitly invoked if initialize without a serializer # # === Block # Block with following parameters to be called with received message # identity(String):: Serialized identity of broker delivering the message # message(Packet|String):: Message received, which is unserialized unless :no_unserialize was specified # header(AMQP::Protocol::Header):: Message header (optional block parameter) # # === Return # true:: Always return true def receive(queue, header, message, options, &block) begin if options[:no_unserialize] || @serializer.nil? execute_callback(block, @identity, message, header) elsif message == "nil" # This happens as part of connecting an instance agent to a broker prior to version 13 header.ack if options[:ack] logger.debug("RECV #{@alias} nil message ignored") elsif packet = unserialize(queue, message, options) execute_callback(block, @identity, packet, header) elsif options[:ack] # Need to ack empty packet since no callback is being made header.ack end true rescue StandardError => e header.ack if options[:ack] logger.exception("Failed receiving message from queue #{queue.inspect} on broker #{@alias}", e, :trace) @exception_stats.track("receive", e) @non_delivery_stats.update("receive failure") end end # Unserialize message, check that it is an acceptable type, and log it # # === Parameters # queue(String):: Name of queue # message(String):: Serialized packet # options(Hash):: Subscribe options # (packet class)(Array(Symbol)):: Filters to be applied in to_s when logging packet to :info, # only packet classes specified are accepted, others are not processed but are logged with error # :category(String):: Packet category description to be used in error messages # :log_data(String):: Additional data to display at end of log entry # :no_log(Boolean):: Disable receive logging unless debug level # # === Return # (Packet|nil):: Unserialized packet or nil if not of right type or if there is an exception def unserialize(queue, message, options = {}) begin received_at = Time.now.to_f packet = @serializer.method(:load).arity.abs > 1 ? @serializer.load(message, queue) : @serializer.load(message) if options.key?(packet.class) unless options[:no_log] && logger.level != :debug re = "RE-" if packet.respond_to?(:tries) && !packet.tries.empty? packet.received_at = received_at if packet.respond_to?(:received_at) log_filter = options[packet.class] unless logger.level == :debug logger.info("#{re}RECV #{@alias} #{packet.to_s(log_filter, :recv_version)} #{options[:log_data]}") end packet else category = options[:category] + " " if options[:category] logger.error("Received invalid #{category}packet type from queue #{queue} on broker #{@alias}: #{packet.class}\n" + caller.join("\n")) nil end rescue StandardError => e # TODO Taking advantage of Serializer knowledge here even though out of scope trace = e.class.name =~ /SerializationError/ ? :caller : :trace logger.exception("Failed unserializing message from queue #{queue.inspect} on broker #{@alias}", e, trace) @exception_stats.track("receive", e) @options[:exception_on_receive_callback].call(message, e) if @options[:exception_on_receive_callback] @non_delivery_stats.update("receive failure") nil end end # Make status updates for connect success # # === Return # true:: Always return true def update_success @last_failed = false @retries = 0 true end # Make status updates for connect failure # # === Return # true:: Always return true def update_failure logger.exception("Failed to connect to broker #{@alias}") if @last_failed @retries += 1 else @last_failed = true @retries = 0 @failure_stats.update end true end # Handle message returned by broker because it could not deliver it # # === Parameters # header(AMQP::Protocol::Header):: Message header # message(String):: Serialized packet # # === Return # true:: Always return true def handle_return(header, message) begin to = if header.exchange && !header.exchange.empty? then header.exchange else header.routing_key end reason = header.reply_text callback = @options[:return_message_callback] logger.__send__(callback ? :debug : :info, "RETURN #{@alias} for #{to} because #{reason}") callback.call(@identity, to, reason, message) if callback rescue Exception => e logger.exception("Failed return #{header.inspect} of message from broker #{@alias}", e, :trace) @exception_stats.track("return", e) end true end # Execute packet receive callback, make it a separate method to ease instrumentation # # === Parameters # callback(Proc):: Proc to run # args(Array):: Array of pass-through arguments # # === Return # (Object):: Callback return value def execute_callback(callback, *args) (callback.arity == 2 ? callback.call(*args[0, 2]) : callback.call(*args)) if callback end end # BrokerClient end # RightAMQP