# encoding: utf-8 require "amqp/int_allocator" require "amqp/exchange" require "amqp/queue" module AMQP # h2. What are AMQP channels # # To quote {http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Specification.pdf AMQP 0.9.1 specification}: # # AMQP is a multi-channelled protocol. Channels provide a way to multiplex # a heavyweight TCP/IP connection into several light weight connections. # This makes the protocol more “firewall friendly” since port usage is predictable. # It also means that traffic shaping and other network QoS features can be easily employed. # Channels are independent of each other and can perform different functions simultaneously # with other channels, the available bandwidth being shared between the concurrent activities. # # h2. Opening a channel # # *Channels are opened asynchronously*. There are two ways to do it: using a callback or pseudo-synchronous mode. # # @example Opening a channel with a callback # # this assumes EventMachine reactor is running # AMQP.connect("amqp://guest:guest@dev.rabbitmq.com:5672") do |client| # AMQP::Channel.new(client) do |channel, open_ok| # # when this block is executed, channel is open and ready for use # end # end # # # # Unless your application needs multiple channels, this approach is recommended. Alternatively, # AMQP::Channel can be instantiated without a block. Then returned channel is not immediately open, # however, it can be used as if it was a synchronous, blocking method: # # @example Instantiating a channel that will be open eventually # # this assumes EventMachine reactor is running # AMQP.connect("amqp://guest:guest@dev.rabbitmq.com:5672") do |client| # channel = AMQP::Channel.new(client) # exchange = channel.default_exchange # # # ... # end # # # # Even though in the example above channel isn't immediately open, it is safe to declare exchanges using # it. Exchange declaration will be delayed until after channel is open. Same applies to queue declaration # and other operations on exchanges and queues. Library methods that rely on channel being open will be # enqueued and executed in a FIFO manner when broker confirms channel opening. # Note, however, that *this "pseudo-synchronous mode" is easy to abuse and introduce race conditions AMQP gem # cannot resolve for you*. AMQP is an inherently asynchronous protocol and AMQP gem embraces this fact. # # # h2. Key methods # # Key methods of Channel class are # # * {Channel#queue} # * {Channel#default_exchange} # * {Channel#direct} # * {Channel#fanout} # * {Channel#topic} # * {Channel#close} # # refer to documentation for those methods for usage examples. # # Channel provides a number of convenience methods that instantiate queues and exchanges # of various types associated with this channel: # # * {Channel#queue} # * {Channel#default_exchange} # * {Channel#direct} # * {Channel#fanout} # * {Channel#topic} # # # h2. Error handling # # It is possible (and, indeed, recommended) to handle channel-level exceptions by defining an errback using #on_error: # # @example Queue declaration with incompatible attributes results in a channel-level exception # AMQP.start("amqp://guest:guest@dev.rabbitmq.com:5672") do |connection, open_ok| # AMQP::Channel.new do |channel, open_ok| # puts "Channel ##{channel.id} is now open!" # # channel.on_error do |ch, close| # puts "Handling channel-level exception" # # connection.close { # EM.stop { exit } # } # end # # EventMachine.add_timer(0.4) do # # these two definitions result in a race condition. For sake of this example, # # however, it does not matter. Whatever definition succeeds first, 2nd one will # # cause a channel-level exception (because attributes are not identical) # AMQP::Queue.new(channel, "amqpgem.examples.channel_exception", :auto_delete => true, :durable => false) do |queue| # puts "#{queue.name} is ready to go" # end # # AMQP::Queue.new(channel, "amqpgem.examples.channel_exception", :auto_delete => true, :durable => true) do |queue| # puts "#{queue.name} is ready to go" # end # end # end # end # # # # When channel-level exception is indicated by the broker and errback defined using #on_error is run, channel is already # closed and all queue and exchange objects associated with this channel are reset. The recommended way to recover from # channel-level exceptions is to open a new channel and re-instantiate queues, exchanges and bindings your application # needs. # # # # h2. Closing a channel # # Channels are opened when objects is instantiated and closed using {#close} method when application no longer # needs it. # # @example Closing a channel your application no longer needs # # this assumes EventMachine reactor is running # AMQP.connect("amqp://guest:guest@dev.rabbitmq.com:5672") do |client| # AMQP::Channel.new(client) do |channel, open_ok| # channel.close do |close_ok| # # when this block is executed, channel is successfully closed # end # end # end # # # # # h2. RabbitMQ extensions. # # AMQP gem supports several RabbitMQ extensions that extend Channel functionality. # Learn more in {file:docs/VendorSpecificExtensions.textile} # # @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Specification.pdf AMQP 0.9.1 specification (Section 2.2.5) class Channel # # Behaviours # extend RegisterEntityMixin include Entity extend ProtocolMethodHandlers register_entity :queue, AMQP::Queue register_entity :exchange, AMQP::Exchange # # API # # AMQP connection this channel is part of # @return [Connection] attr_reader :connection alias :conn :connection # Status of this channel (one of: :opening, :closing, :open, :closed) # @return [Symbol] attr_reader :status DEFAULT_REPLY_TEXT = "Goodbye".freeze attr_reader :id attr_reader :exchanges_awaiting_declare_ok, :exchanges_awaiting_delete_ok attr_reader :queues_awaiting_declare_ok, :queues_awaiting_delete_ok, :queues_awaiting_bind_ok, :queues_awaiting_unbind_ok, :queues_awaiting_purge_ok, :queues_awaiting_get_response attr_reader :consumers_awaiting_consume_ok, :consumers_awaiting_cancel_ok attr_accessor :flow_is_active # Change publisher index. Publisher index is incremented # by 1 after each Basic.Publish starting at 1. This is done # on both client and server, hence this acknowledged messages # can be matched via its delivery-tag. # # @api private attr_writer :publisher_index # @param [AMQP::Session] connection Connection to open this channel on. If not given, default AMQP # connection (accessible via {AMQP.connection}) will be used. # @param [Integer] id Channel id. Must not be greater than max channel id client and broker # negotiated on during connection setup. Almost always the right thing to do # is to let AMQP gem pick channel identifier for you. If you want to get next # channel id, use {AMQP::Channel.next_channel_id} (it is thread-safe). # @param [Hash] options A hash of options # # @example Instantiating a channel for default connection (accessible as AMQP.connection) # # AMQP.connect do |connection| # AMQP::Channel.new(connection) do |channel, open_ok| # # channel is ready: set up your messaging flow by creating exchanges, # # queues, binding them together and so on. # end # end # # @example Instantiating a channel for explicitly given connection # # AMQP.connect do |connection| # AMQP::Channel.new(connection) do |channel, open_ok| # # ... # end # end # # @example Instantiating a channel with a :prefetch option # # AMQP.connect do |connection| # AMQP::Channel.new(connection, AMQP::Channel.next_channel_id, :prefetch => 5) do |channel, open_ok| # # ... # end # end # # # @option options [Boolean] :prefetch (nil) Specifies number of messages to prefetch. Channel-specific. See {AMQP::Channel#prefetch}. # @option options [Boolean] :auto_recovery (nil) Turns on automatic network failure recovery mode for this channel. # # @yield [channel, open_ok] Yields open channel instance and AMQP method (channel.open-ok) instance. The latter is optional. # @yieldparam [Channel] channel Channel that is successfully open # @yieldparam [AMQP::Protocol::Channel::OpenOk] open_ok AMQP channel.open-ok) instance # # # @see AMQP::Channel#prefetch # @api public def initialize(connection = nil, id = nil, options = {}, &block) raise 'AMQP can only be used from within EM.run {}' unless EM.reactor_running? @connection = connection || AMQP.connection || AMQP.start # this means 2nd argument is options if id.kind_of?(Hash) options = options.merge(id) id = @connection.next_channel_id end super(@connection) @id = id || @connection.next_channel_id @exchanges = Hash.new @queues = Hash.new @consumers = Hash.new @options = { :auto_recovery => @connection.auto_recovering? }.merge(options) @auto_recovery = (!!@options[:auto_recovery]) # we must synchronize frameset delivery. MK. @mutex = Mutex.new reset_state! # 65536 is here for cases when channel is opened without passing a callback in, # otherwise channel_mix would be nil and it causes a lot of needless headaches. # lets just have this default. MK. channel_max = if @connection.open? @connection.channel_max || 65536 else 65536 end if channel_max != 0 && !(0..channel_max).include?(@id) raise ArgumentError.new("Max channel for the connection is #{channel_max}, given: #{@id}") end # we need this deferrable to mimic what AMQP gem 0.7 does to enable # the following (pseudo-synchronous) style of programming some people use in their # existing codebases: # # connection = AMQP.connect # channel = AMQP::Channel.new(connection) # queue = AMQP::Queue.new(channel) # # ... # # Read more about EM::Deferrable#callback behavior in EventMachine documentation. MK. @channel_is_open_deferrable = AMQP::Deferrable.new @parameter_checks = {:queue => [:durable, :exclusive, :auto_delete, :arguments], :exchange => [:type, :durable, :arguments]} # only send channel.open when connection is actually open. Makes it possible to # do c = AMQP.connect; AMQP::Channel.new(c) that is what some people do. MK. @connection.on_connection do self.open do |ch, open_ok| @channel_is_open_deferrable.succeed if block case block.arity when 1 then block.call(ch) else block.call(ch, open_ok) end # case end # if self.prefetch(options[:prefetch], false) if options[:prefetch] end # self.open end # @connection.on_open end # @return [Boolean] true if this channel is in automatic recovery mode # @see #auto_recovering? attr_accessor :auto_recovery # @return [Boolean] true if this channel uses automatic recovery mode def auto_recovering? @auto_recovery end # auto_recovering? # Called by associated connection object when AMQP connection has been re-established # (for example, after a network failure). # # @api plugin def auto_recover return unless auto_recovering? @channel_is_open_deferrable.fail @channel_is_open_deferrable = AMQP::Deferrable.new self.open do @channel_is_open_deferrable.succeed # re-establish prefetch self.prefetch(@options[:prefetch], false) if @options[:prefetch] # exchanges must be recovered first because queue recovery includes recovery of bindings. MK. @exchanges.each { |name, e| e.auto_recover } @queues.each { |name, q| q.auto_recover } end end # auto_recover # Can be used to recover channels from channel-level exceptions. Allocates a new channel id and reopens # itself with this new id, releasing the old id after the new one is allocated. # # This includes recovery of known exchanges, queues and bindings, exactly the same way as when # the client recovers from a network failure. # # @api public def reuse old_id = @id # must release after we allocate a new id, otherwise we will end up # with the same value. MK. @id = self.class.next_channel_id @connection.release_channel_id(old_id) @channel_is_open_deferrable.fail @channel_is_open_deferrable = AMQP::Deferrable.new self.open do @channel_is_open_deferrable.succeed # re-establish prefetch self.prefetch(@options[:prefetch], false) if @options[:prefetch] # exchanges must be recovered first because queue recovery includes recovery of bindings. MK. @exchanges.each { |name, e| e.auto_recover } @queues.each { |name, q| q.auto_recover } end end # reuse # @group Declaring exchanges # Defines, intializes and returns a direct Exchange instance. # # Learn more about direct exchanges in {Exchange Exchange class documentation}. # # # @param [String] name (amq.direct) Exchange name. # # @option opts [Boolean] :passive (false) If set, the server will not create the exchange if it does not # already exist. The client can use this to check whether an exchange # exists without modifying the server state. # # @option opts [Boolean] :durable (false) If set when creating a new exchange, the exchange will be marked as # durable. Durable exchanges and their bindings are recreated upon a server # restart (information about them is persisted). Non-durable (transient) exchanges # do not survive if/when a server restarts (information about them is stored exclusively # in RAM). # # # @option opts [Boolean] :auto_delete (false) If set, the exchange is deleted when all queues have finished # using it. The server waits for a short period of time before # determining the exchange is unused to give time to the client code # to bind a queue to it. # # @option opts [Boolean] :internal (default false) If set, the exchange may not be used directly by publishers, but # only when bound to other exchanges. Internal exchanges are used to # construct wiring that is not visible to applications. This is a RabbitMQ-specific # extension. # # @option opts [Boolean] :nowait (true) If set, the server will not respond to the method. The client should # not wait for a reply method. If the server could not complete the # method it will raise a channel or connection exception. # # # @example Using default pre-declared direct exchange and no callbacks (pseudo-synchronous style) # # # an exchange application A will be using to publish updates # # to some search index # exchange = channel.direct("index.updates") # # # In the same (or different) process declare a queue that broker will # # generate name for, bind it to aforementioned exchange using method chaining # queue = channel.queue(""). # # queue will be receiving messages that were published with # # :routing_key attribute value of "search.index.updates" # bind(exchange, :routing_key => "search.index.updates"). # # register a callback that will be run when messages arrive # subscribe { |header, message| puts("Received #{message}") } # # # now publish a new document contents for indexing, # # message will be delivered to the queue we declared and bound on the line above # exchange.publish(document.content, :routing_key => "search.index.updates") # # # @example Instantiating a direct exchange using {Channel#direct} with a callback # # AMQP.connect do |connection| # AMQP::Channel.new(connection) do |channel| # channel.direct("email.replies_listener") do |exchange, declare_ok| # # by now exchange is ready and waiting # end # end # end # # # @see Channel#default_exchange # @see Exchange # @see Exchange#initialize # @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Specification.pdf AMQP 0.9.1 specification (Section 3.1.3.1) # # @return [Exchange] # @api public def direct(name = 'amq.direct', opts = {}, &block) if exchange = find_exchange(name) extended_opts = Exchange.add_default_options(:direct, name, opts, block) validate_parameters_match!(exchange, extended_opts, :exchange) block.call(exchange) if block exchange else register_exchange(Exchange.new(self, :direct, name, opts, &block)) end end # Returns exchange object with the same name as default (aka unnamed) exchange. # Default exchange is a direct exchange and automatically routes messages to # queues when routing key matches queue name exactly. This feature is known as # "automatic binding" (of queues to default exchange). # # *Use default exchange when you want to route messages directly to specific queues* # (queue names are known, you don't mind this kind of coupling between applications). # # # @example Using default exchange to publish messages to queues with known names # AMQP.start(:host => 'localhost') do |connection| # ch = AMQP::Channel.new(connection) # # queue1 = ch.queue("queue1").subscribe do |payload| # puts "[#{queue1.name}] => #{payload}" # end # queue2 = ch.queue("queue2").subscribe do |payload| # puts "[#{queue2.name}] => #{payload}" # end # queue3 = ch.queue("queue3").subscribe do |payload| # puts "[#{queue3.name}] => #{payload}" # end # queues = [queue1, queue2, queue3] # # # Rely on default direct exchange binding, see section 2.1.2.4 Automatic Mode in AMQP 0.9.1 spec. # exchange = AMQP::Exchange.default # EM.add_periodic_timer(1) do # q = queues.sample # # exchange.publish "Some payload from #{Time.now.to_i}", :routing_key => q.name # end # end # # # # @see Exchange # @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Specification.pdf AMQP 0.9.1 specification (Section 2.1.2.4) # # @return [Exchange] # @api public def default_exchange @default_exchange ||= Exchange.default(self) end # Defines, intializes and returns a fanout Exchange instance. # # Learn more about fanout exchanges in {Exchange Exchange class documentation}. # # # @param [String] name (amq.fanout) Exchange name. # # @option opts [Boolean] :passive (false) If set, the server will not create the exchange if it does not # already exist. The client can use this to check whether an exchange # exists without modifying the server state. # # @option opts [Boolean] :durable (false) If set when creating a new exchange, the exchange will be marked as # durable. Durable exchanges and their bindings are recreated upon a server # restart (information about them is persisted). Non-durable (transient) exchanges # do not survive if/when a server restarts (information about them is stored exclusively # in RAM). # # # @option opts [Boolean] :auto_delete (false) If set, the exchange is deleted when all queues have finished # using it. The server waits for a short period of time before # determining the exchange is unused to give time to the client code # to bind a queue to it. # # @option opts [Boolean] :internal (default false) If set, the exchange may not be used directly by publishers, but # only when bound to other exchanges. Internal exchanges are used to # construct wiring that is not visible to applications. This is a RabbitMQ-specific # extension. # # @option opts [Boolean] :nowait (true) If set, the server will not respond to the method. The client should # not wait for a reply method. If the server could not complete the # method it will raise a channel or connection exception. # # @example Using fanout exchange to deliver messages to multiple consumers # # # open up a channel # # declare a fanout exchange # # declare 3 queues, binds them # # publish a message # # @see Exchange # @see Exchange#initialize # @see Channel#default_exchange # @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Specification.pdf AMQP 0.9.1 specification (Section 3.1.3.2) # # @return [Exchange] # @api public def fanout(name = 'amq.fanout', opts = {}, &block) if exchange = find_exchange(name) extended_opts = Exchange.add_default_options(:fanout, name, opts, block) validate_parameters_match!(exchange, extended_opts, :exchange) block.call(exchange) if block exchange else register_exchange(Exchange.new(self, :fanout, name, opts, &block)) end end # Defines, intializes and returns a topic Exchange instance. # # Learn more about topic exchanges in {Exchange Exchange class documentation}. # # @param [String] name (amq.topic) Exchange name. # # # @option opts [Boolean] :passive (false) If set, the server will not create the exchange if it does not # already exist. The client can use this to check whether an exchange # exists without modifying the server state. # # @option opts [Boolean] :durable (false) If set when creating a new exchange, the exchange will be marked as # durable. Durable exchanges and their bindings are recreated upon a server # restart (information about them is persisted). Non-durable (transient) exchanges # do not survive if/when a server restarts (information about them is stored exclusively # in RAM). # # # @option opts [Boolean] :auto_delete (false) If set, the exchange is deleted when all queues have finished # using it. The server waits for a short period of time before # determining the exchange is unused to give time to the client code # to bind a queue to it. # # @option opts [Boolean] :internal (default false) If set, the exchange may not be used directly by publishers, but # only when bound to other exchanges. Internal exchanges are used to # construct wiring that is not visible to applications. This is a RabbitMQ-specific # extension. # # @option opts [Boolean] :nowait (true) If set, the server will not respond to the method. The client should # not wait for a reply method. If the server could not complete the # method it will raise a channel or connection exception. # # @example Using topic exchange to deliver relevant news updates # AMQP.connect do |connection| # channel = AMQP::Channel.new(connection) # exchange = channel.topic("pub/sub") # # # Subscribers. # channel.queue("development").bind(exchange, :key => "technology.dev.#").subscribe do |payload| # puts "A new dev post: '#{payload}'" # end # channel.queue("ruby").bind(exchange, :key => "technology.#.ruby").subscribe do |payload| # puts "A new post about Ruby: '#{payload}'" # end # # # Let's publish some data. # exchange.publish "Ruby post", :routing_key => "technology.dev.ruby" # exchange.publish "Erlang post", :routing_key => "technology.dev.erlang" # exchange.publish "Sinatra post", :routing_key => "technology.web.ruby" # exchange.publish "Jewelery post", :routing_key => "jewelery.ruby" # end # # # @example Using topic exchange to deliver geographically-relevant data # AMQP.connect do |connection| # channel = AMQP::Channel.new(connection) # exchange = channel.topic("pub/sub") # # # Subscribers. # channel.queue("americas.north").bind(exchange, :routing_key => "americas.north.#").subscribe do |headers, payload| # puts "An update for North America: #{payload}, routing key is #{headers.routing_key}" # end # channel.queue("americas.south").bind(exchange, :routing_key => "americas.south.#").subscribe do |headers, payload| # puts "An update for South America: #{payload}, routing key is #{headers.routing_key}" # end # channel.queue("us.california").bind(exchange, :routing_key => "americas.north.us.ca.*").subscribe do |headers, payload| # puts "An update for US/California: #{payload}, routing key is #{headers.routing_key}" # end # channel.queue("us.tx.austin").bind(exchange, :routing_key => "#.tx.austin").subscribe do |headers, payload| # puts "An update for Austin, TX: #{payload}, routing key is #{headers.routing_key}" # end # channel.queue("it.rome").bind(exchange, :routing_key => "europe.italy.rome").subscribe do |headers, payload| # puts "An update for Rome, Italy: #{payload}, routing key is #{headers.routing_key}" # end # channel.queue("asia.hk").bind(exchange, :routing_key => "asia.southeast.hk.#").subscribe do |headers, payload| # puts "An update for Hong Kong: #{payload}, routing key is #{headers.routing_key}" # end # # exchange.publish("San Diego update", :routing_key => "americas.north.us.ca.sandiego"). # publish("Berkeley update", :routing_key => "americas.north.us.ca.berkeley"). # publish("San Francisco update", :routing_key => "americas.north.us.ca.sanfrancisco"). # publish("New York update", :routing_key => "americas.north.us.ny.newyork"). # publish("São Paolo update", :routing_key => "americas.south.brazil.saopaolo"). # publish("Hong Kong update", :routing_key => "asia.southeast.hk.hongkong"). # publish("Kyoto update", :routing_key => "asia.southeast.japan.kyoto"). # publish("Shanghai update", :routing_key => "asia.southeast.prc.shanghai"). # publish("Rome update", :routing_key => "europe.italy.roma"). # publish("Paris update", :routing_key => "europe.france.paris") # end # # @see Exchange # @see Exchange#initialize # @see http://www.rabbitmq.com/faq.html#Binding-and-Routing RabbitMQ FAQ on routing & wildcards # @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Specification.pdf AMQP 0.9.1 specification (Section 3.1.3.3) # # @return [Exchange] # @api public def topic(name = 'amq.topic', opts = {}, &block) if exchange = find_exchange(name) extended_opts = Exchange.add_default_options(:topic, name, opts, block) validate_parameters_match!(exchange, extended_opts, :exchange) block.call(exchange) if block exchange else register_exchange(Exchange.new(self, :topic, name, opts, &block)) end end # Defines, intializes and returns a headers Exchange instance. # # Learn more about headers exchanges in {Exchange Exchange class documentation}. # # @param [String] name (amq.match) Exchange name. # # @option opts [Boolean] :passive (false) If set, the server will not create the exchange if it does not # already exist. The client can use this to check whether an exchange # exists without modifying the server state. # # @option opts [Boolean] :durable (false) If set when creating a new exchange, the exchange will be marked as # durable. Durable exchanges and their bindings are recreated upon a server # restart (information about them is persisted). Non-durable (transient) exchanges # do not survive if/when a server restarts (information about them is stored exclusively # in RAM). # # # @option opts [Boolean] :auto_delete (false) If set, the exchange is deleted when all queues have finished # using it. The server waits for a short period of time before # determining the exchange is unused to give time to the client code # to bind a queue to it. # # @option opts [Boolean] :internal (default false) If set, the exchange may not be used directly by publishers, but # only when bound to other exchanges. Internal exchanges are used to # construct wiring that is not visible to applications. This is a RabbitMQ-specific # extension. # # @option opts [Boolean] :nowait (true) If set, the server will not respond to the method. The client should # not wait for a reply method. If the server could not complete the # method it will raise a channel or connection exception. # # # @example Using headers exchange to route messages based on multiple attributes (OS, architecture, # of cores) # # puts "=> Headers routing example" # puts # AMQP.start do |connection| # channel = AMQP::Channel.new(connection) # channel.on_error do |ch, channel_close| # puts "A channel-level exception: #{channel_close.inspect}" # end # # exchange = channel.headers("amq.match", :durable => true) # # channel.queue("", :auto_delete => true).bind(exchange, :arguments => { 'x-match' => 'all', :arch => "x64", :os => 'linux' }).subscribe do |metadata, payload| # puts "[linux/x64] Got a message: #{payload}" # end # channel.queue("", :auto_delete => true).bind(exchange, :arguments => { 'x-match' => 'all', :arch => "x32", :os => 'linux' }).subscribe do |metadata, payload| # puts "[linux/x32] Got a message: #{payload}" # end # channel.queue("", :auto_delete => true).bind(exchange, :arguments => { 'x-match' => 'any', :os => 'linux', :arch => "__any__" }).subscribe do |metadata, payload| # puts "[linux] Got a message: #{payload}" # end # channel.queue("", :auto_delete => true).bind(exchange, :arguments => { 'x-match' => 'any', :os => 'macosx', :cores => 8 }).subscribe do |metadata, payload| # puts "[macosx|octocore] Got a message: #{payload}" # end # # # EventMachine.add_timer(0.5) do # exchange.publish "For linux/x64", :headers => { :arch => "x64", :os => 'linux' } # exchange.publish "For linux/x32", :headers => { :arch => "x32", :os => 'linux' } # exchange.publish "For linux", :headers => { :os => 'linux' } # exchange.publish "For OS X", :headers => { :os => 'macosx' } # exchange.publish "For solaris/x64", :headers => { :os => 'solaris', :arch => 'x64' } # exchange.publish "For ocotocore", :headers => { :cores => 8 } # end # # # show_stopper = Proc.new do # $stdout.puts "Stopping..." # connection.close { # EventMachine.stop { exit } # } # end # # Signal.trap "INT", show_stopper # EventMachine.add_timer(2, show_stopper) # end # # # # @see Exchange # @see Exchange#initialize # @see Channel#default_exchange # @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Specification.pdf AMQP 0.9.1 specification (Section 3.1.3.3) # # @return [Exchange] # @api public def headers(name = 'amq.match', opts = {}, &block) if exchange = find_exchange(name) extended_opts = Exchange.add_default_options(:headers, name, opts, block) validate_parameters_match!(exchange, extended_opts, :exchange) block.call(exchange) if block exchange else register_exchange(Exchange.new(self, :headers, name, opts, &block)) end end # @endgroup # @group Declaring queues # Declares and returns a Queue instance associated with this channel. See {Queue Queue class documentation} for # more information about queues. # # To make broker generate queue name for you (a classic example is exclusive # queues that are only used for a short period of time), pass empty string # as name value. Then queue will get it's name as soon as broker's response # (queue.declare-ok) arrives. Note that in this case, block is required. # # # Like for exchanges, queue names starting with 'amq.' cannot be modified and # should not be used by applications. # # @example Declaring a queue in a mail delivery app using Channel#queue without a block # AMQP.connect do |connection| # AMQP::Channel.new(connection) do |ch| # # message producers will be able to send messages to this queue # # using direct exchange and routing key = "mail.delivery" # queue = ch.queue("mail.delivery", :durable => true) # queue.subscribe do |headers, payload| # # ... # end # end # end # # @example Declaring a server-named exclusive queue that receives all messages related to events, using a block. # AMQP.connect do |connection| # AMQP::Channel.new(connection) do |ch| # # message producers will be able to send messages to this queue # # using amq.topic exchange with routing keys that begin with "events" # ch.queue("", :exclusive => true) do |queue| # queue.bind(ch.exchange("amq.topic"), :routing_key => "events.#").subscribe do |headers, payload| # # ... # end # end # end # end # # @param [String] name Queue name. If you want a server-named queue, you can omit the name (note that in this case, using block is mandatory). # See {Queue Queue class documentation} for discussion of queue lifecycles and when use of server-named queues # is optimal. # # @option opts [Boolean] :passive (false) If set, the server will not create the exchange if it does not # already exist. The client can use this to check whether an exchange # exists without modifying the server state. # # @option opts [Boolean] :durable (false) If set when creating a new exchange, the exchange will be marked as # durable. Durable exchanges and their bindings are recreated upon a server # restart (information about them is persisted). Non-durable (transient) exchanges # do not survive if/when a server restarts (information about them is stored exclusively # in RAM). Any remaining messages in the queue will be purged when the queue # is deleted regardless of the message's persistence setting. # # # @option opts [Boolean] :auto_delete (false) If set, the exchange is deleted when all queues have finished # using it. The server waits for a short period of time before # determining the exchange is unused to give time to the client code # to bind a queue to it. # # @option opts [Boolean] :exclusive (false) Exclusive queues may only be used by a single connection. # Exclusivity also implies that queue is automatically deleted when connection # is closed. Only one consumer is allowed to remove messages from exclusive queue. # # @option opts [Boolean] :nowait (true) If set, the server will not respond to the method. The client should # not wait for a reply method. If the server could not complete the # method it will raise a channel or connection exception. # # @yield [queue, declare_ok] Yields successfully declared queue instance and AMQP method (queue.declare-ok) instance. The latter is optional. # @yieldparam [Queue] queue Queue that is successfully declared and is ready to be used. # @yieldparam [AMQP::Protocol::Queue::DeclareOk] declare_ok AMQP queue.declare-ok) instance. # # @see Queue # @see Queue#initialize # @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Specification.pdf AMQP 0.9.1 specification (Section 2.1.4) # # @return [Queue] # @api public def queue(name = AMQ::Protocol::EMPTY_STRING, opts = {}, &block) raise ArgumentError.new("queue name must not be nil; if you want broker to generate queue name for you, pass an empty string") if name.nil? if name && !name.empty? && (queue = find_queue(name)) extended_opts = Queue.add_default_options(name, opts, block) validate_parameters_match!(queue, extended_opts, :queue) block.call(queue) if block queue else self.queue!(name, opts, &block) end end # Same as {Channel#queue} but when queue with the same name already exists in this channel # object's cache, this method will replace existing queue with a newly defined one. Consider # using {Channel#queue} instead. # # @see Channel#queue # # @return [Queue] # @api public def queue!(name, opts = {}, &block) queue = if block.nil? Queue.new(self, name, opts) else shim = Proc.new { |q, method| if block.arity == 1 block.call(q) else queue = find_queue(method.queue) block.call(queue, method.consumer_count, method.message_count) end } Queue.new(self, name, opts, &shim) end register_queue(queue) end # @return [Array] Queues cache for this channel # @api plugin # @private def queues @queues end # queues # @endgroup # @group Channel lifecycle # Opens AMQP channel. # # @note Instantiated channels are opened by default. This method should only be used for error recovery after network connection loss. # @api public def open(&block) @connection.send_frame(AMQ::Protocol::Channel::Open.encode(@id, AMQ::Protocol::EMPTY_STRING)) @connection.channels[@id] = self self.status = :opening self.redefine_callback :open, &block end alias reopen open # @return [Boolean] true if channel is not closed. # @api public def open? self.status == :opened || self.status == :opening end # open? # Takes a block that will be deferred till the moment when channel is considered open # (channel.open-ok is received from the broker). If you need to delay an operation # till the moment channel is open, this method is what you are looking for. # # Multiple callbacks are supported. If when this moment is called, channel is already # open, block is executed immediately. # # @api public def once_open(&block) @channel_is_open_deferrable.callback do # guards against cases when deferred operations # don't complete before the channel is closed block.call if open? end end # once_open(&block) alias once_opened once_open # @return [Boolean] # @api public def closing? self.status == :closing end # Closes AMQP channel. # # @api public def close(reply_code = 200, reply_text = DEFAULT_REPLY_TEXT, class_id = 0, method_id = 0, &block) self.once_open do self.status = :closing @connection.send_frame(AMQ::Protocol::Channel::Close.encode(@id, reply_code, reply_text, class_id, method_id)) self.redefine_callback :close, &block end end # @endgroup # @group QoS and flow handling # Asks the peer to pause or restart the flow of content data sent to a consumer. # This is a simple flow­control mechanism that a peer can use to avoid overflowing its # queues or otherwise finding itself receiving more messages than it can process. Note that # this method is not intended for window control. It does not affect contents returned to # Queue#get callers. # # @param [Boolean] Desired flow state. # # @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Reference.pdf AMQP 0.9.1 protocol documentation (Section 1.5.2.3.) # @api public def flow(active = false, &block) @connection.send_frame(AMQ::Protocol::Channel::Flow.encode(@id, active)) self.redefine_callback :flow, &block self end # @return [Boolean] True if flow in this channel is active (messages will be delivered to consumers that use this channel). # # @api public def flow_is_active? @flow_is_active end # flow_is_active? # @param [Fixnum] Message count # @param [Boolean] global (false) # # @return [Channel] self # # @api public def prefetch(count, global = false, &block) self.once_open do # RabbitMQ as of 2.3.1 does not support prefetch_size. self.qos(0, count, global, &block) end self end # @endgroup # @group Message acknowledgements # Acknowledge one or all messages on the channel. # # @api public # @see #reject # @see #recover # @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Reference.pdf AMQP 0.9.1 protocol documentation (Section 1.8.3.13.) def acknowledge(delivery_tag, multiple = false) @connection.send_frame(AMQ::Protocol::Basic::Ack.encode(self.id, delivery_tag, multiple)) self end # acknowledge(delivery_tag, multiple = false) # Reject a message with given delivery tag. # # @api public # @see #acknowledge # @see #recover # @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Reference.pdf AMQP 0.9.1 protocol documentation (Section 1.8.3.14.) def reject(delivery_tag, requeue = true, multi = false) if multi @connection.send_frame(AMQ::Protocol::Basic::Nack.encode(self.id, delivery_tag, multi, requeue)) else @connection.send_frame(AMQ::Protocol::Basic::Reject.encode(self.id, delivery_tag, requeue)) end self end # reject(delivery_tag, requeue = true) # Notifies AMQ broker that consumer has recovered and unacknowledged messages need # to be redelivered. # # @return [Channel] self # # @note RabbitMQ as of 2.3.1 does not support basic.recover with requeue = false. # @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Reference.pdf AMQP 0.9.1 protocol documentation (Section 1.8.3.16.) # @see #acknowledge # @api public def recover(requeue = true, &block) @connection.send_frame(AMQ::Protocol::Basic::Recover.encode(@id, requeue)) self.redefine_callback :recover, &block self end # recover(requeue = false, &block) # @endgroup # @group Transactions # Sets the channel to use standard transactions. One must use this method at least # once on a channel before using #tx_tommit or tx_rollback methods. # # @api public def tx_select(&block) @connection.send_frame(AMQ::Protocol::Tx::Select.encode(@id)) self.redefine_callback :tx_select, &block self end # tx_select(&block) # Commits AMQP transaction. # # @api public def tx_commit(&block) @connection.send_frame(AMQ::Protocol::Tx::Commit.encode(@id)) self.redefine_callback :tx_commit, &block self end # tx_commit(&block) # Rolls AMQP transaction back. # # @api public def tx_rollback(&block) @connection.send_frame(AMQ::Protocol::Tx::Rollback.encode(@id)) self.redefine_callback :tx_rollback, &block self end # tx_rollback(&block) # @endgroup # @group Error handling # Defines a callback that will be executed when channel is closed after # channel-level exception. # # @api public def on_error(&block) self.define_callback(:error, &block) end # @endgroup # @group Publisher Confirms def confirm_select(nowait = false, &block) self.once_open do if nowait && block raise ArgumentError, "confirm.select with nowait = true and a callback makes no sense" end @uses_publisher_confirmations = true reset_publisher_index! self.redefine_callback(:confirm_select, &block) unless nowait self.redefine_callback(:after_publish) do increment_publisher_index! end @connection.send_frame(AMQ::Protocol::Confirm::Select.encode(@id, nowait)) self end end # @endgroup # # Implementation # # Resets channel state (for example, list of registered queue objects and so on). # # Most of the time, this method is not # called by application code. # # @private # @api plugin def reset(&block) # See AMQP::Channel self.reset_state! # there is no way to reset a deferrable; we have to use a new instance. MK. @channel_is_open_deferrable = AMQP::Deferrable.new @channel_is_open_deferrable.callback(&block) @connection.on_connection do @channel_is_open_deferrable.succeed self.prefetch(@options[:prefetch], false) if @options[:prefetch] end end # Overrides superclass method to also re-create @channel_is_open_deferrable # # @api plugin # @private def handle_connection_interruption(method = nil) @queues.each { |name, q| q.handle_connection_interruption(method) } @exchanges.each { |name, e| e.handle_connection_interruption(method) } self.exec_callback_yielding_self(:after_connection_interruption) self.reset_state! @connection.release_channel_id(@id) unless auto_recovering? @channel_is_open_deferrable = AMQP::Deferrable.new end # @return [Boolean] true if this channel uses automatic recovery mode def auto_recovering? @auto_recovery end # auto_recovering? # @return [Hash] def consumers @consumers end # consumers # @return [Array] Collection of queues that were declared on this channel. def queues @queues.values end # @return [Array] Collection of exchanges that were declared on this channel. def exchanges @exchanges.values end # AMQP connection this channel belongs to. # # @return [AMQP::Connection] Connection this channel belongs to. def connection @connection end # connection # Synchronizes given block using this channel's mutex. # @api public def synchronize(&block) @mutex.synchronize(&block) end # @group QoS and flow handling # Requests a specific quality of service. The QoS can be specified for the current channel # or for all channels on the connection. # # @note RabbitMQ as of 2.3.1 does not support prefetch_size. # @api public def qos(prefetch_size = 0, prefetch_count = 32, global = false, &block) @connection.send_frame(AMQ::Protocol::Basic::Qos.encode(@id, prefetch_size, prefetch_count, global)) self.redefine_callback :qos, &block self end # qos # @endgroup # @group Error handling # Defines a callback that will be executed after TCP connection is interrupted (typically because of a network failure). # Only one callback can be defined (the one defined last replaces previously added ones). # # @api public def on_connection_interruption(&block) self.redefine_callback(:after_connection_interruption, &block) end # on_connection_interruption(&block) alias after_connection_interruption on_connection_interruption # Defines a callback that will be executed after TCP connection has recovered after a network failure # but before AMQP connection is re-opened. # Only one callback can be defined (the one defined last replaces previously added ones). # # @api public def before_recovery(&block) self.redefine_callback(:before_recovery, &block) end # before_recovery(&block) # @private def run_before_recovery_callbacks self.exec_callback_yielding_self(:before_recovery) @queues.each { |name, q| q.run_before_recovery_callbacks } @exchanges.each { |name, e| e.run_before_recovery_callbacks } end # Defines a callback that will be executed after AMQP connection has recovered after a network failure. # Only one callback can be defined (the one defined last replaces previously added ones). # # @api public def on_recovery(&block) self.redefine_callback(:after_recovery, &block) end # on_recovery(&block) alias after_recovery on_recovery # @private def run_after_recovery_callbacks self.exec_callback_yielding_self(:after_recovery) @queues.each { |name, q| q.run_after_recovery_callbacks } @exchanges.each { |name, e| e.run_after_recovery_callbacks } end # Called by associated connection object when AMQP connection has been re-established # (for example, after a network failure). # # @api plugin def auto_recover return unless auto_recovering? self.open do # exchanges must be recovered first because queue recovery includes recovery of bindings. MK. @exchanges.each { |name, e| e.auto_recover } @queues.each { |name, q| q.auto_recover } end end # auto_recover # @endgroup # Publisher index is an index of the last message since # the confirmations were activated, started with 0. It's # incremented by 1 every time a message is published. # This is done on both client and server, hence this # acknowledged messages can be matched via its delivery-tag. # # @return [Integer] Current publisher index. # @api public def publisher_index @publisher_index ||= 0 end # Resets publisher index to 0 # # @api plugin def reset_publisher_index! @publisher_index = 0 end # This method is executed after publishing of each message via {Exchage#publish}. # Currently it just increments publisher index by 1, so messages # can be actually matched. # # @api plugin def increment_publisher_index! @publisher_index += 1 end # @return [Boolean] def uses_publisher_confirmations? @uses_publisher_confirmations end # uses_publisher_confirmations? # Turn on confirmations for this channel and, if given, # register callback for basic.ack from the broker. # # @raise [RuntimeError] Occurs when confirmations are already activated. # @raise [RuntimeError] Occurs when nowait is true and block is given. # @param [Boolean] nowait Whether we expect Confirm.Select-Ok to be returned by the broker or not. # # @yield [basick_ack] Callback which will be executed every time we receive Basic.Ack from the broker. # @yieldparam [AMQ::Protocol::Basic::Ack] basick_ack Protocol method class instance. # # @return [self] self. def on_ack(nowait = false, &block) self.define_callback(:ack, &block) if block self end # Register error callback for Basic.Nack. It's called # when message(s) is rejected. # # @return [self] self def on_nack(&block) self.define_callback(:nack, &block) if block self end # Handler for Confirm.Select-Ok. By default, it just # executes hook specified via the #confirmations method # with a single argument, a protocol method class # instance (an instance of AMQ::Protocol::Confirm::SelectOk) # and then it deletes the callback, since Confirm.Select # is supposed to be sent just once. # # @api plugin def handle_select_ok(method) self.exec_callback_once(:confirm_select, method) end # Handler for Basic.Ack. By default, it just # executes hook specified via the #confirm method # with a single argument, a protocol method class # instance (an instance of AMQ::Protocol::Basic::Ack). # # @api plugin def handle_basic_ack(method) self.exec_callback(:ack, method) end # Handler for Basic.Nack. By default, it just # executes hook specified via the #confirm_failed method # with a single argument, a protocol method class # instance (an instance of AMQ::Protocol::Basic::Nack). # # @api plugin def handle_basic_nack(method) self.exec_callback(:nack, method) end # # Implementation # def register_exchange(exchange) raise ArgumentError, "argument is nil!" if exchange.nil? @exchanges[exchange.name] = exchange end # register_exchange(exchange) # Finds exchange in the exchanges cache on this channel by name. Exchange only exists in the cache if # it was previously instantiated on this channel. # # @param [String] name Exchange name # @return [AMQP::Exchange] Exchange (if found) # @api plugin def find_exchange(name) @exchanges[name] end # @api plugin # @private def register_queue(queue) raise ArgumentError, "argument is nil!" if queue.nil? @queues[queue.name] = queue end # register_queue(queue) # @api plugin # @private def find_queue(name) @queues[name] end RECOVERY_EVENTS = [:after_connection_interruption, :before_recovery, :after_recovery].freeze # @api plugin # @private def reset_state! @flow_is_active = true @queues_awaiting_declare_ok = Array.new @exchanges_awaiting_declare_ok = Array.new @queues_awaiting_delete_ok = Array.new @exchanges_awaiting_delete_ok = Array.new @queues_awaiting_purge_ok = Array.new @queues_awaiting_bind_ok = Array.new @queues_awaiting_unbind_ok = Array.new @consumers_awaiting_consume_ok = Array.new @consumers_awaiting_cancel_ok = Array.new @queues_awaiting_get_response = Array.new @callbacks = @callbacks.delete_if { |k, v| !RECOVERY_EVENTS.include?(k) } @uses_publisher_confirmations = false end # reset_state! # @api plugin # @private def handle_open_ok(open_ok) self.status = :opened self.exec_callback_once_yielding_self(:open, open_ok) end # @api plugin # @private def handle_close_ok(close_ok) self.status = :closed self.connection.clear_frames_on(self.id) self.exec_callback_once_yielding_self(:close, close_ok) @connection.release_channel_id(@id) end # @api plugin # @private def handle_close(channel_close) self.status = :closed self.connection.clear_frames_on(self.id) self.exec_callback_yielding_self(:error, channel_close) end self.handle(AMQ::Protocol::Channel::OpenOk) do |connection, frame| channel = connection.channels[frame.channel] channel.handle_open_ok(frame.decode_payload) end self.handle(AMQ::Protocol::Channel::CloseOk) do |connection, frame| method = frame.decode_payload channels = connection.channels channel = channels[frame.channel] channels.delete(channel) channel.handle_close_ok(method) end self.handle(AMQ::Protocol::Channel::Close) do |connection, frame| method = frame.decode_payload channels = connection.channels channel = channels[frame.channel] connection.send_frame(AMQ::Protocol::Channel::CloseOk.encode(frame.channel)) channel.handle_close(method) end self.handle(AMQ::Protocol::Basic::QosOk) do |connection, frame| channel = connection.channels[frame.channel] channel.exec_callback(:qos, frame.decode_payload) end self.handle(AMQ::Protocol::Basic::RecoverOk) do |connection, frame| channel = connection.channels[frame.channel] channel.exec_callback(:recover, frame.decode_payload) end self.handle(AMQ::Protocol::Channel::FlowOk) do |connection, frame| channel = connection.channels[frame.channel] method = frame.decode_payload channel.flow_is_active = method.active channel.exec_callback(:flow, method) end self.handle(AMQ::Protocol::Tx::SelectOk) do |connection, frame| channel = connection.channels[frame.channel] channel.exec_callback(:tx_select, frame.decode_payload) end self.handle(AMQ::Protocol::Tx::CommitOk) do |connection, frame| channel = connection.channels[frame.channel] channel.exec_callback(:tx_commit, frame.decode_payload) end self.handle(AMQ::Protocol::Tx::RollbackOk) do |connection, frame| channel = connection.channels[frame.channel] channel.exec_callback(:tx_rollback, frame.decode_payload) end self.handle(AMQ::Protocol::Confirm::SelectOk) do |connection, frame| method = frame.decode_payload channel = connection.channels[frame.channel] channel.handle_select_ok(method) end self.handle(AMQ::Protocol::Basic::Ack) do |connection, frame| method = frame.decode_payload channel = connection.channels[frame.channel] channel.handle_basic_ack(method) end self.handle(AMQ::Protocol::Basic::Nack) do |connection, frame| method = frame.decode_payload channel = connection.channels[frame.channel] channel.handle_basic_nack(method) end protected @private def validate_parameters_match!(entity, parameters, type) unless entity.opts.values_at(*@parameter_checks[type]) == parameters.values_at(*@parameter_checks[type]) || parameters[:passive] raise AMQP::IncompatibleOptionsError.new(entity.name, entity.opts, parameters) end end # validate_parameters_match!(entity, parameters, type) end # Channel end # AMQP