# encoding: utf-8
require "amqp/int_allocator"
require "amqp/exchange"
require "amqp/queue"
module AMQP
# h2. What are AMQP channels
#
# To quote {http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Specification.pdf AMQP 0.9.1 specification}:
#
# AMQP is a multi-channelled protocol. Channels provide a way to multiplex
# a heavyweight TCP/IP connection into several light weight connections.
# This makes the protocol more “firewall friendly” since port usage is predictable.
# It also means that traffic shaping and other network QoS features can be easily employed.
# Channels are independent of each other and can perform different functions simultaneously
# with other channels, the available bandwidth being shared between the concurrent activities.
#
# h2. Opening a channel
#
# *Channels are opened asynchronously*. There are two ways to do it: using a callback or pseudo-synchronous mode.
#
# @example Opening a channel with a callback
# # this assumes EventMachine reactor is running
# AMQP.connect("amqp://guest:guest@dev.rabbitmq.com:5672") do |client|
# AMQP::Channel.new(client) do |channel, open_ok|
# # when this block is executed, channel is open and ready for use
# end
# end
#
#
#
# Unless your application needs multiple channels, this approach is recommended. Alternatively,
# AMQP::Channel can be instantiated without a block. Then returned channel is not immediately open,
# however, it can be used as if it was a synchronous, blocking method:
#
# @example Instantiating a channel that will be open eventually
# # this assumes EventMachine reactor is running
# AMQP.connect("amqp://guest:guest@dev.rabbitmq.com:5672") do |client|
# channel = AMQP::Channel.new(client)
# exchange = channel.default_exchange
#
# # ...
# end
#
#
#
# Even though in the example above channel isn't immediately open, it is safe to declare exchanges using
# it. Exchange declaration will be delayed until after channel is open. Same applies to queue declaration
# and other operations on exchanges and queues. Library methods that rely on channel being open will be
# enqueued and executed in a FIFO manner when broker confirms channel opening.
# Note, however, that *this "pseudo-synchronous mode" is easy to abuse and introduce race conditions AMQP gem
# cannot resolve for you*. AMQP is an inherently asynchronous protocol and AMQP gem embraces this fact.
#
#
# h2. Key methods
#
# Key methods of Channel class are
#
# * {Channel#queue}
# * {Channel#default_exchange}
# * {Channel#direct}
# * {Channel#fanout}
# * {Channel#topic}
# * {Channel#close}
#
# refer to documentation for those methods for usage examples.
#
# Channel provides a number of convenience methods that instantiate queues and exchanges
# of various types associated with this channel:
#
# * {Channel#queue}
# * {Channel#default_exchange}
# * {Channel#direct}
# * {Channel#fanout}
# * {Channel#topic}
#
#
# h2. Error handling
#
# It is possible (and, indeed, recommended) to handle channel-level exceptions by defining an errback using #on_error:
#
# @example Queue declaration with incompatible attributes results in a channel-level exception
# AMQP.start("amqp://guest:guest@dev.rabbitmq.com:5672") do |connection, open_ok|
# AMQP::Channel.new do |channel, open_ok|
# puts "Channel ##{channel.id} is now open!"
#
# channel.on_error do |ch, close|
# puts "Handling channel-level exception"
#
# connection.close {
# EM.stop { exit }
# }
# end
#
# EventMachine.add_timer(0.4) do
# # these two definitions result in a race condition. For sake of this example,
# # however, it does not matter. Whatever definition succeeds first, 2nd one will
# # cause a channel-level exception (because attributes are not identical)
# AMQP::Queue.new(channel, "amqpgem.examples.channel_exception", :auto_delete => true, :durable => false) do |queue|
# puts "#{queue.name} is ready to go"
# end
#
# AMQP::Queue.new(channel, "amqpgem.examples.channel_exception", :auto_delete => true, :durable => true) do |queue|
# puts "#{queue.name} is ready to go"
# end
# end
# end
# end
#
#
#
# When channel-level exception is indicated by the broker and errback defined using #on_error is run, channel is already
# closed and all queue and exchange objects associated with this channel are reset. The recommended way to recover from
# channel-level exceptions is to open a new channel and re-instantiate queues, exchanges and bindings your application
# needs.
#
#
#
# h2. Closing a channel
#
# Channels are opened when objects is instantiated and closed using {#close} method when application no longer
# needs it.
#
# @example Closing a channel your application no longer needs
# # this assumes EventMachine reactor is running
# AMQP.connect("amqp://guest:guest@dev.rabbitmq.com:5672") do |client|
# AMQP::Channel.new(client) do |channel, open_ok|
# channel.close do |close_ok|
# # when this block is executed, channel is successfully closed
# end
# end
# end
#
#
#
#
# h2. RabbitMQ extensions.
#
# AMQP gem supports several RabbitMQ extensions taht extend Channel functionality.
# Learn more in {file:docs/VendorSpecificExtensions.textile}
#
# @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Specification.pdf AMQP 0.9.1 specification (Section 2.2.5)
class Channel < AMQ::Client::Channel
#
# API
#
# AMQP connection this channel is part of
# @return [Connection]
attr_reader :connection
alias :conn :connection
# Status of this channel (one of: :opening, :closing, :open, :closed)
# @return [Symbol]
attr_reader :status
# @param [AMQP::Session] connection Connection to open this channel on. If not given, default AMQP
# connection (accessible via {AMQP.connection}) will be used.
# @param [Integer] id Channel id. Must not be greater than max channel id client and broker
# negotiated on during connection setup. Almost always the right thing to do
# is to let AMQP gem pick channel identifier for you. If you want to get next
# channel id, use {AMQP::Channel.next_channel_id} (it is thread-safe).
# @param [Hash] options A hash of options
#
# @example Instantiating a channel for default connection (accessible as AMQP.connection)
#
# AMQP.connect do |connection|
# AMQP::Channel.new(connection) do |channel, open_ok|
# # channel is ready: set up your messaging flow by creating exchanges,
# # queues, binding them together and so on.
# end
# end
#
# @example Instantiating a channel for explicitly given connection
#
# AMQP.connect do |connection|
# AMQP::Channel.new(connection) do |channel, open_ok|
# # ...
# end
# end
#
# @example Instantiating a channel with a :prefetch option
#
# AMQP.connect do |connection|
# AMQP::Channel.new(connection, AMQP::Channel.next_channel_id, :prefetch => 5) do |channel, open_ok|
# # ...
# end
# end
#
#
# @option options [Boolean] :prefetch (nil) Specifies number of messages to prefetch. Channel-specific. See {AMQP::Channel#prefetch}.
# @option options [Boolean] :auto_recovery (nil) Turns on automatic network failure recovery mode for this channel.
#
# @yield [channel, open_ok] Yields open channel instance and AMQP method (channel.open-ok) instance. The latter is optional.
# @yieldparam [Channel] channel Channel that is successfully open
# @yieldparam [AMQP::Protocol::Channel::OpenOk] open_ok AMQP channel.open-ok) instance
#
#
# @see AMQP::Channel#prefetch
# @api public
def initialize(connection = nil, id = self.class.next_channel_id, options = {}, &block)
raise 'AMQP can only be used from within EM.run {}' unless EM.reactor_running?
@connection = connection || AMQP.connection || AMQP.start
# this means 2nd argument is options
if id.kind_of?(Hash)
options = options.merge(id)
id = self.class.next_channel_id
end
super(@connection, id, options)
@rpcs = Hash.new
# we need this deferrable to mimic what AMQP gem 0.7 does to enable
# the following (pseudo-synchronous) style of programming some people use in their
# existing codebases:
#
# connection = AMQP.connect
# channel = AMQP::Channel.new(connection)
# queue = AMQP::Queue.new(channel)
#
# ...
#
# Read more about EM::Deferrable#callback behavior in EventMachine documentation. MK.
@channel_is_open_deferrable = AMQ::Client::EventMachineClient::Deferrable.new
@parameter_checks = {:queue => [:durable, :exclusive, :auto_delete, :arguments], :exchange => [:type, :durable, :arguments]}
# only send channel.open when connection is actually open. Makes it possible to
# do c = AMQP.connect; AMQP::Channel.new(c) that is what some people do. MK.
@connection.on_connection do
self.open do |ch, open_ok|
@channel_is_open_deferrable.succeed
if block
case block.arity
when 1 then block.call(ch)
else block.call(ch, open_ok)
end # case
end # if
self.prefetch(options[:prefetch], false) if options[:prefetch]
end # self.open
end # @connection.on_open
end
# @return [Boolean] true if this channel is in automatic recovery mode
# @see #auto_recovering?
attr_accessor :auto_recovery
# @return [Boolean] true if this channel uses automatic recovery mode
def auto_recovering?
@auto_recovery
end # auto_recovering?
# Called by associated connection object when AMQP connection has been re-established
# (for example, after a network failure).
#
# @api plugin
def auto_recover
return unless auto_recovering?
@channel_is_open_deferrable.fail
@channel_is_open_deferrable = AMQ::Client::EventMachineClient::Deferrable.new
self.open do
@channel_is_open_deferrable.succeed
# re-establish prefetch
self.prefetch(@options[:prefetch], false) if @options[:prefetch]
# exchanges must be recovered first because queue recovery includes recovery of bindings. MK.
@exchanges.each { |name, e| e.auto_recover }
@queues.each { |name, q| q.auto_recover }
end
end # auto_recover
# Can be used to recover channels from channel-level exceptions. Allocates a new channel id and reopens
# itself with this new id, releasing the old id after the new one is allocated.
#
# @api public
def reuse
old_id = @id
# must release after we allocate a new id, otherwise we will end up
# with the same value. MK.
@id = self.class.next_channel_id
self.class.release_channel_id(old_id)
@channel_is_open_deferrable.fail
@channel_is_open_deferrable = AMQ::Client::EventMachineClient::Deferrable.new
self.open do
@channel_is_open_deferrable.succeed
# re-establish prefetch
self.prefetch(@options[:prefetch], false) if @options[:prefetch]
# exchanges must be recovered first because queue recovery includes recovery of bindings. MK.
@exchanges.each { |name, e| e.auto_recover }
@queues.each { |name, q| q.auto_recover }
end
end # reuse
# @group Declaring exchanges
# Defines, intializes and returns a direct Exchange instance.
#
# Learn more about direct exchanges in {Exchange Exchange class documentation}.
#
#
# @param [String] name (amq.direct) Exchange name.
#
# @option opts [Boolean] :passive (false) If set, the server will not create the exchange if it does not
# already exist. The client can use this to check whether an exchange
# exists without modifying the server state.
#
# @option opts [Boolean] :durable (false) If set when creating a new exchange, the exchange will be marked as
# durable. Durable exchanges and their bindings are recreated upon a server
# restart (information about them is persisted). Non-durable (transient) exchanges
# do not survive if/when a server restarts (information about them is stored exclusively
# in RAM).
#
#
# @option opts [Boolean] :auto_delete (false) If set, the exchange is deleted when all queues have finished
# using it. The server waits for a short period of time before
# determining the exchange is unused to give time to the client code
# to bind a queue to it.
#
# @option opts [Boolean] :internal (default false) If set, the exchange may not be used directly by publishers, but
# only when bound to other exchanges. Internal exchanges are used to
# construct wiring that is not visible to applications. This is a RabbitMQ-specific
# extension.
#
# @option opts [Boolean] :nowait (true) If set, the server will not respond to the method. The client should
# not wait for a reply method. If the server could not complete the
# method it will raise a channel or connection exception.
#
#
# @raise [AMQP::Error] Raised when exchange is redeclared with parameters different from original declaration.
# @raise [AMQP::Error] Raised when exchange is declared with :passive => true and the exchange does not exist.
#
#
# @example Using default pre-declared direct exchange and no callbacks (pseudo-synchronous style)
#
# # an exchange application A will be using to publish updates
# # to some search index
# exchange = channel.direct("index.updates")
#
# # In the same (or different) process declare a queue that broker will
# # generate name for, bind it to aforementioned exchange using method chaining
# queue = channel.queue("").
# # queue will be receiving messages that were published with
# # :routing_key attribute value of "search.index.updates"
# bind(exchange, :routing_key => "search.index.updates").
# # register a callback that will be run when messages arrive
# subscribe { |header, message| puts("Received #{message}") }
#
# # now publish a new document contents for indexing,
# # message will be delivered to the queue we declared and bound on the line above
# exchange.publish(document.content, :routing_key => "search.index.updates")
#
#
# @example Instantiating a direct exchange using {Channel#direct} with a callback
#
# AMQP.connect do |connection|
# AMQP::Channel.new(connection) do |channel|
# channel.direct("email.replies_listener") do |exchange, declare_ok|
# # by now exchange is ready and waiting
# end
# end
# end
#
#
# @see Channel#default_exchange
# @see Exchange
# @see Exchange#initialize
# @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Specification.pdf AMQP 0.9.1 specification (Section 3.1.3.1)
#
# @return [Exchange]
# @api public
def direct(name = 'amq.direct', opts = {}, &block)
if exchange = find_exchange(name)
extended_opts = Exchange.add_default_options(:direct, name, opts, block)
validate_parameters_match!(exchange, extended_opts, :exchange)
block.call(exchange) if block
exchange
else
register_exchange(Exchange.new(self, :direct, name, opts, &block))
end
end
# Returns exchange object with the same name as default (aka unnamed) exchange.
# Default exchange is a direct exchange and automatically routes messages to
# queues when routing key matches queue name exactly. This feature is known as
# "automatic binding" (of queues to default exchange).
#
# *Use default exchange when you want to route messages directly to specific queues*
# (queue names are known, you don't mind this kind of coupling between applications).
#
#
# @example Using default exchange to publish messages to queues with known names
# AMQP.start(:host => 'localhost') do |connection|
# ch = AMQP::Channel.new(connection)
#
# queue1 = ch.queue("queue1").subscribe do |payload|
# puts "[#{queue1.name}] => #{payload}"
# end
# queue2 = ch.queue("queue2").subscribe do |payload|
# puts "[#{queue2.name}] => #{payload}"
# end
# queue3 = ch.queue("queue3").subscribe do |payload|
# puts "[#{queue3.name}] => #{payload}"
# end
# queues = [queue1, queue2, queue3]
#
# # Rely on default direct exchange binding, see section 2.1.2.4 Automatic Mode in AMQP 0.9.1 spec.
# exchange = AMQP::Exchange.default
# EM.add_periodic_timer(1) do
# q = queues.sample
#
# exchange.publish "Some payload from #{Time.now.to_i}", :routing_key => q.name
# end
# end
#
#
#
# @see Exchange
# @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Specification.pdf AMQP 0.9.1 specification (Section 2.1.2.4)
#
# @return [Exchange]
# @api public
def default_exchange
@default_exchange ||= Exchange.default(self)
end
# Defines, intializes and returns a fanout Exchange instance.
#
# Learn more about fanout exchanges in {Exchange Exchange class documentation}.
#
#
# @param [String] name (amq.fanout) Exchange name.
#
# @option opts [Boolean] :passive (false) If set, the server will not create the exchange if it does not
# already exist. The client can use this to check whether an exchange
# exists without modifying the server state.
#
# @option opts [Boolean] :durable (false) If set when creating a new exchange, the exchange will be marked as
# durable. Durable exchanges and their bindings are recreated upon a server
# restart (information about them is persisted). Non-durable (transient) exchanges
# do not survive if/when a server restarts (information about them is stored exclusively
# in RAM).
#
#
# @option opts [Boolean] :auto_delete (false) If set, the exchange is deleted when all queues have finished
# using it. The server waits for a short period of time before
# determining the exchange is unused to give time to the client code
# to bind a queue to it.
#
# @option opts [Boolean] :internal (default false) If set, the exchange may not be used directly by publishers, but
# only when bound to other exchanges. Internal exchanges are used to
# construct wiring that is not visible to applications. This is a RabbitMQ-specific
# extension.
#
# @option opts [Boolean] :nowait (true) If set, the server will not respond to the method. The client should
# not wait for a reply method. If the server could not complete the
# method it will raise a channel or connection exception.
#
#
# @raise [AMQP::Error] Raised when exchange is redeclared with parameters different from original declaration.
# @raise [AMQP::Error] Raised when exchange is declared with :passive => true and the exchange does not exist.
#
#
# @example Using fanout exchange to deliver messages to multiple consumers
#
# # open up a channel
# # declare a fanout exchange
# # declare 3 queues, binds them
# # publish a message
#
# @see Exchange
# @see Exchange#initialize
# @see Channel#default_exchange
# @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Specification.pdf AMQP 0.9.1 specification (Section 3.1.3.2)
#
# @return [Exchange]
# @api public
def fanout(name = 'amq.fanout', opts = {}, &block)
if exchange = find_exchange(name)
extended_opts = Exchange.add_default_options(:fanout, name, opts, block)
validate_parameters_match!(exchange, extended_opts, :exchange)
block.call(exchange) if block
exchange
else
register_exchange(Exchange.new(self, :fanout, name, opts, &block))
end
end
# Defines, intializes and returns a topic Exchange instance.
#
# Learn more about topic exchanges in {Exchange Exchange class documentation}.
#
# @param [String] name (amq.topic) Exchange name.
#
#
# @option opts [Boolean] :passive (false) If set, the server will not create the exchange if it does not
# already exist. The client can use this to check whether an exchange
# exists without modifying the server state.
#
# @option opts [Boolean] :durable (false) If set when creating a new exchange, the exchange will be marked as
# durable. Durable exchanges and their bindings are recreated upon a server
# restart (information about them is persisted). Non-durable (transient) exchanges
# do not survive if/when a server restarts (information about them is stored exclusively
# in RAM).
#
#
# @option opts [Boolean] :auto_delete (false) If set, the exchange is deleted when all queues have finished
# using it. The server waits for a short period of time before
# determining the exchange is unused to give time to the client code
# to bind a queue to it.
#
# @option opts [Boolean] :internal (default false) If set, the exchange may not be used directly by publishers, but
# only when bound to other exchanges. Internal exchanges are used to
# construct wiring that is not visible to applications. This is a RabbitMQ-specific
# extension.
#
# @option opts [Boolean] :nowait (true) If set, the server will not respond to the method. The client should
# not wait for a reply method. If the server could not complete the
# method it will raise a channel or connection exception.
#
#
# @raise [AMQP::Error] Raised when exchange is redeclared with parameters different from original declaration.
# @raise [AMQP::Error] Raised when exchange is declared with :passive => true and the exchange does not exist.
#
#
# @example Using topic exchange to deliver relevant news updates
# AMQP.connect do |connection|
# channel = AMQP::Channel.new(connection)
# exchange = channel.topic("pub/sub")
#
# # Subscribers.
# channel.queue("development").bind(exchange, :key => "technology.dev.#").subscribe do |payload|
# puts "A new dev post: '#{payload}'"
# end
# channel.queue("ruby").bind(exchange, :key => "technology.#.ruby").subscribe do |payload|
# puts "A new post about Ruby: '#{payload}'"
# end
#
# # Let's publish some data.
# exchange.publish "Ruby post", :routing_key => "technology.dev.ruby"
# exchange.publish "Erlang post", :routing_key => "technology.dev.erlang"
# exchange.publish "Sinatra post", :routing_key => "technology.web.ruby"
# exchange.publish "Jewelery post", :routing_key => "jewelery.ruby"
# end
#
#
# @example Using topic exchange to deliver geographically-relevant data
# AMQP.connect do |connection|
# channel = AMQP::Channel.new(connection)
# exchange = channel.topic("pub/sub")
#
# # Subscribers.
# channel.queue("americas.north").bind(exchange, :routing_key => "americas.north.#").subscribe do |headers, payload|
# puts "An update for North America: #{payload}, routing key is #{headers.routing_key}"
# end
# channel.queue("americas.south").bind(exchange, :routing_key => "americas.south.#").subscribe do |headers, payload|
# puts "An update for South America: #{payload}, routing key is #{headers.routing_key}"
# end
# channel.queue("us.california").bind(exchange, :routing_key => "americas.north.us.ca.*").subscribe do |headers, payload|
# puts "An update for US/California: #{payload}, routing key is #{headers.routing_key}"
# end
# channel.queue("us.tx.austin").bind(exchange, :routing_key => "#.tx.austin").subscribe do |headers, payload|
# puts "An update for Austin, TX: #{payload}, routing key is #{headers.routing_key}"
# end
# channel.queue("it.rome").bind(exchange, :routing_key => "europe.italy.rome").subscribe do |headers, payload|
# puts "An update for Rome, Italy: #{payload}, routing key is #{headers.routing_key}"
# end
# channel.queue("asia.hk").bind(exchange, :routing_key => "asia.southeast.hk.#").subscribe do |headers, payload|
# puts "An update for Hong Kong: #{payload}, routing key is #{headers.routing_key}"
# end
#
# exchange.publish("San Diego update", :routing_key => "americas.north.us.ca.sandiego").
# publish("Berkeley update", :routing_key => "americas.north.us.ca.berkeley").
# publish("San Francisco update", :routing_key => "americas.north.us.ca.sanfrancisco").
# publish("New York update", :routing_key => "americas.north.us.ny.newyork").
# publish("São Paolo update", :routing_key => "americas.south.brazil.saopaolo").
# publish("Hong Kong update", :routing_key => "asia.southeast.hk.hongkong").
# publish("Kyoto update", :routing_key => "asia.southeast.japan.kyoto").
# publish("Shanghai update", :routing_key => "asia.southeast.prc.shanghai").
# publish("Rome update", :routing_key => "europe.italy.roma").
# publish("Paris update", :routing_key => "europe.france.paris")
# end
#
# @see Exchange
# @see Exchange#initialize
# @see http://www.rabbitmq.com/faq.html#Binding-and-Routing RabbitMQ FAQ on routing & wildcards
# @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Specification.pdf AMQP 0.9.1 specification (Section 3.1.3.3)
#
# @return [Exchange]
# @api public
def topic(name = 'amq.topic', opts = {}, &block)
if exchange = find_exchange(name)
extended_opts = Exchange.add_default_options(:topic, name, opts, block)
validate_parameters_match!(exchange, extended_opts, :exchange)
block.call(exchange) if block
exchange
else
register_exchange(Exchange.new(self, :topic, name, opts, &block))
end
end
# Defines, intializes and returns a headers Exchange instance.
#
# Learn more about headers exchanges in {Exchange Exchange class documentation}.
#
# @param [String] name (amq.match) Exchange name.
#
# @option opts [Boolean] :passive (false) If set, the server will not create the exchange if it does not
# already exist. The client can use this to check whether an exchange
# exists without modifying the server state.
#
# @option opts [Boolean] :durable (false) If set when creating a new exchange, the exchange will be marked as
# durable. Durable exchanges and their bindings are recreated upon a server
# restart (information about them is persisted). Non-durable (transient) exchanges
# do not survive if/when a server restarts (information about them is stored exclusively
# in RAM).
#
#
# @option opts [Boolean] :auto_delete (false) If set, the exchange is deleted when all queues have finished
# using it. The server waits for a short period of time before
# determining the exchange is unused to give time to the client code
# to bind a queue to it.
#
# @option opts [Boolean] :internal (default false) If set, the exchange may not be used directly by publishers, but
# only when bound to other exchanges. Internal exchanges are used to
# construct wiring that is not visible to applications. This is a RabbitMQ-specific
# extension.
#
# @option opts [Boolean] :nowait (true) If set, the server will not respond to the method. The client should
# not wait for a reply method. If the server could not complete the
# method it will raise a channel or connection exception.
#
#
# @raise [AMQP::Error] Raised when exchange is redeclared with parameters different from original declaration.
# @raise [AMQP::Error] Raised when exchange is declared with :passive => true and the exchange does not exist.
#
#
# @example Using headers exchange to route messages based on multiple attributes (OS, architecture, # of cores)
#
# puts "=> Headers routing example"
# puts
# AMQP.start do |connection|
# channel = AMQP::Channel.new(connection)
# channel.on_error do |ch, channel_close|
# puts "A channel-level exception: #{channel_close.inspect}"
# end
#
# exchange = channel.headers("amq.match", :durable => true)
#
# channel.queue("", :auto_delete => true).bind(exchange, :arguments => { 'x-match' => 'all', :arch => "x64", :os => 'linux' }).subscribe do |metadata, payload|
# puts "[linux/x64] Got a message: #{payload}"
# end
# channel.queue("", :auto_delete => true).bind(exchange, :arguments => { 'x-match' => 'all', :arch => "x32", :os => 'linux' }).subscribe do |metadata, payload|
# puts "[linux/x32] Got a message: #{payload}"
# end
# channel.queue("", :auto_delete => true).bind(exchange, :arguments => { 'x-match' => 'any', :os => 'linux', :arch => "__any__" }).subscribe do |metadata, payload|
# puts "[linux] Got a message: #{payload}"
# end
# channel.queue("", :auto_delete => true).bind(exchange, :arguments => { 'x-match' => 'any', :os => 'macosx', :cores => 8 }).subscribe do |metadata, payload|
# puts "[macosx|octocore] Got a message: #{payload}"
# end
#
#
# EventMachine.add_timer(0.5) do
# exchange.publish "For linux/x64", :headers => { :arch => "x64", :os => 'linux' }
# exchange.publish "For linux/x32", :headers => { :arch => "x32", :os => 'linux' }
# exchange.publish "For linux", :headers => { :os => 'linux' }
# exchange.publish "For OS X", :headers => { :os => 'macosx' }
# exchange.publish "For solaris/x64", :headers => { :os => 'solaris', :arch => 'x64' }
# exchange.publish "For ocotocore", :headers => { :cores => 8 }
# end
#
#
# show_stopper = Proc.new do
# $stdout.puts "Stopping..."
# connection.close {
# EventMachine.stop { exit }
# }
# end
#
# Signal.trap "INT", show_stopper
# EventMachine.add_timer(2, show_stopper)
# end
#
#
#
# @see Exchange
# @see Exchange#initialize
# @see Channel#default_exchange
# @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Specification.pdf AMQP 0.9.1 specification (Section 3.1.3.3)
#
# @return [Exchange]
# @api public
def headers(name = 'amq.match', opts = {}, &block)
if exchange = find_exchange(name)
extended_opts = Exchange.add_default_options(:headers, name, opts, block)
validate_parameters_match!(exchange, extended_opts, :exchange)
block.call(exchange) if block
exchange
else
register_exchange(Exchange.new(self, :headers, name, opts, &block))
end
end
# @endgroup
# @group Declaring queues
# Declares and returns a Queue instance associated with this channel. See {Queue Queue class documentation} for
# more information about queues.
#
# To make broker generate queue name for you (a classic example is exclusive
# queues that are only used for a short period of time), pass empty string
# as name value. Then queue will get it's name as soon as broker's response
# (queue.declare-ok) arrives. Note that in this case, block is required.
#
#
# Like for exchanges, queue names starting with 'amq.' cannot be modified and
# should not be used by applications.
#
# @example Declaring a queue in a mail delivery app using Channel#queue without a block
# AMQP.connect do |connection|
# AMQP::Channel.new(connection) do |ch|
# # message producers will be able to send messages to this queue
# # using direct exchange and routing key = "mail.delivery"
# queue = ch.queue("mail.delivery", :durable => true)
# queue.subscribe do |headers, payload|
# # ...
# end
# end
# end
#
# @example Declaring a server-named exclusive queue that receives all messages related to events, using a block.
# AMQP.connect do |connection|
# AMQP::Channel.new(connection) do |ch|
# # message producers will be able to send messages to this queue
# # using amq.topic exchange with routing keys that begin with "events"
# ch.queue("", :exclusive => true) do |queue|
# queue.bind(ch.exchange("amq.topic"), :routing_key => "events.#").subscribe do |headers, payload|
# # ...
# end
# end
# end
# end
#
# @param [String] name Queue name. If you want a server-named queue, you can omit the name (note that in this case, using block is mandatory).
# See {Queue Queue class documentation} for discussion of queue lifecycles and when use of server-named queues
# is optimal.
#
# @option opts [Boolean] :passive (false) If set, the server will not create the exchange if it does not
# already exist. The client can use this to check whether an exchange
# exists without modifying the server state.
#
# @option opts [Boolean] :durable (false) If set when creating a new exchange, the exchange will be marked as
# durable. Durable exchanges and their bindings are recreated upon a server
# restart (information about them is persisted). Non-durable (transient) exchanges
# do not survive if/when a server restarts (information about them is stored exclusively
# in RAM). Any remaining messages in the queue will be purged when the queue
# is deleted regardless of the message's persistence setting.
#
#
# @option opts [Boolean] :auto_delete (false) If set, the exchange is deleted when all queues have finished
# using it. The server waits for a short period of time before
# determining the exchange is unused to give time to the client code
# to bind a queue to it.
#
# @option opts [Boolean] :exclusive (false) Exclusive queues may only be used by a single connection.
# Exclusivity also implies that queue is automatically deleted when connection
# is closed. Only one consumer is allowed to remove messages from exclusive queue.
#
# @option opts [Boolean] :nowait (true) If set, the server will not respond to the method. The client should
# not wait for a reply method. If the server could not complete the
# method it will raise a channel or connection exception.
#
#
# @raise [AMQP::Error] Raised when queue is redeclared with parameters different from original declaration.
# @raise [AMQP::Error] Raised when queue is declared with :passive => true and the queue does not exist.
# @raise [AMQP::Error] Raised when queue is declared with :exclusive => true and queue with that name already exist.
#
#
# @yield [queue, declare_ok] Yields successfully declared queue instance and AMQP method (queue.declare-ok) instance. The latter is optional.
# @yieldparam [Queue] queue Queue that is successfully declared and is ready to be used.
# @yieldparam [AMQP::Protocol::Queue::DeclareOk] declare_ok AMQP queue.declare-ok) instance.
#
# @see Queue
# @see Queue#initialize
# @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Specification.pdf AMQP 0.9.1 specification (Section 2.1.4)
#
# @return [Queue]
# @api public
def queue(name = AMQ::Protocol::EMPTY_STRING, opts = {}, &block)
raise ArgumentError.new("queue name must not be nil; if you want broker to generate queue name for you, pass an empty string") if name.nil?
if name && !name.empty? && (queue = find_queue(name))
extended_opts = Queue.add_default_options(name, opts, block)
validate_parameters_match!(queue, extended_opts, :queue)
block.call(queue) if block
queue
else
self.queue!(name, opts, &block)
end
end
# Same as {Channel#queue} but when queue with the same name already exists in this channel
# object's cache, this method will replace existing queue with a newly defined one. Consider
# using {Channel#queue} instead.
#
# @see Channel#queue
#
# @return [Queue]
# @api public
def queue!(name, opts = {}, &block)
queue = if block.nil?
Queue.new(self, name, opts)
else
shim = Proc.new { |q, method|
if block.arity == 1
block.call(q)
else
queue = find_queue(method.queue)
block.call(queue, method.consumer_count, method.message_count)
end
}
Queue.new(self, name, opts, &shim)
end
register_queue(queue)
end
# @return [Array] Queues cache for this channel
# @api plugin
# @private
def queues
@queues
end # queues
# @endgroup
# Instantiates and returns an RPC instance associated with this channel.
#
# The optional object may be a class name, module name or object
# instance. When given a class or module name, the object is instantiated
# during this setup. The passed queue is automatically subscribed to so
# it passes all messages (and their arguments) to the object.
#
# Marshalling and unmarshalling the objects is handled internally. This
# marshalling is subject to the same restrictions as defined in the
# [http://ruby-doc.org/core/classes/Marshal.html Marshal module} in the Ruby standard
# library.
#
# When the optional object is not passed, the returned rpc reference is
# used to send messages and arguments to the queue. See {AMQP::RPC#method_missing}
# which does all of the heavy lifting with the proxy. Some client
# elsewhere must call this method *with* the optional block so that
# there is a valid destination. Failure to do so will just enqueue
# marshalled messages that are never consumed.
#
# @example Use of RPC
#
# # TODO
#
#
# @param [String, Queue] Queue to be used by RPC server.
# @return [RPC]
# @api public
def rpc(name, obj = nil)
RPC.new(self, name, obj)
end
# Returns a hash of all rpc proxy objects.
#
# Most of the time, this method is not
# called by application code.
# @api plugin
def rpcs
@rpcs.values
end
# @group Channel lifecycle
# Opens AMQP channel.
#
# @note Instantiated channels are opened by default. This method should only be used for error recovery after network connection loss.
# @api public
def open(&block)
super(&block)
end
# @return [Boolean] true if channel is not closed.
# @api public
def open?
self.status == :opened || self.status == :opening
end # open?
# Takes a block that will be deferred till the moment when channel is considered open
# (channel.open-ok is received from the broker). If you need to delay an operation
# till the moment channel is open, this method is what you are looking for.
#
# Multiple callbacks are supported. If when this moment is called, channel is already
# open, block is executed immediately.
#
# @api public
def once_open(&block)
@channel_is_open_deferrable.callback do
# guards against cases when deferred operations
# don't complete before the channel is closed
block.call if open?
end
end # once_open(&block)
alias once_opened once_open
# @return [Boolean]
# @api public
def closing?
self.status == :closing
end
# Closes AMQP channel.
#
# @api public
def close(reply_code = 200, reply_text = DEFAULT_REPLY_TEXT, class_id = 0, method_id = 0, &block)
self.status = :closing
r = super(reply_code, reply_text, class_id, method_id, &block)
r
end
# @endgroup
# @group QoS and flow handling
# Asks the peer to pause or restart the flow of content data sent to a consumer.
# This is a simple flowcontrol mechanism that a peer can use to avoid overflowing its
# queues or otherwise finding itself receiving more messages than it can process. Note that
# this method is not intended for window control. It does not affect contents returned to
# Queue#get callers.
#
# @param [Boolean] Desired flow state.
#
# @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Reference.pdf AMQP 0.9.1 protocol documentation (Section 1.5.2.3.)
# @api public
def flow(active = false, &block)
super(active, &block)
end
# @return [Boolean] True if flow in this channel is active (messages will be delivered to consumers that use this channel).
#
# @api public
def flow_is_active?
@flow_is_active
end # flow_is_active?
# @param [Fixnum] Message count
# @param [Boolean] global (false)
#
# @return [Channel] self
#
# @api public
def prefetch(count, global = false, &block)
self.once_open do
# RabbitMQ as of 2.3.1 does not support prefetch_size.
self.qos(0, count, global, &block)
end
self
end
# @endgroup
# @group Message acknowledgements
# Acknowledge one or all messages on the channel.
#
# @api public
# @see #reject
# @see #recover
# @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Reference.pdf AMQP 0.9.1 protocol documentation (Section 1.8.3.13.)
def acknowledge(delivery_tag, multiple = false)
super(delivery_tag, multiple)
end # acknowledge(delivery_tag, multiple = false)
# Reject a message with given delivery tag.
#
# @api public
# @see #acknowledge
# @see #recover
# @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Reference.pdf AMQP 0.9.1 protocol documentation (Section 1.8.3.14.)
def reject(delivery_tag, requeue = true)
super(delivery_tag, requeue)
end # reject(delivery_tag, requeue = true)
# Notifies AMQ broker that consumer has recovered and unacknowledged messages need
# to be redelivered.
#
# @return [Channel] self
#
# @note RabbitMQ as of 2.3.1 does not support basic.recover with requeue = false.
# @see http://files.travis-ci.org/docs/amqp/0.9.1/AMQP091Reference.pdf AMQP 0.9.1 protocol documentation (Section 1.8.3.16.)
# @see #acknowledge
# @api public
def recover(requeue = true, &block)
super(requeue, &block)
end # recover(requeue = false, &block)
# @endgroup
# @group Transactions
# Sets the channel to use standard transactions. One must use this method at least
# once on a channel before using #tx_tommit or tx_rollback methods.
#
# @api public
def tx_select(&block)
super(&block)
end # tx_select(&block)
# Commits AMQP transaction.
#
# @api public
def tx_commit(&block)
super(&block)
end # tx_commit(&block)
# Rolls AMQP transaction back.
#
# @api public
def tx_rollback(&block)
super(&block)
end # tx_rollback(&block)
# @endgroup
# @group Error handling
# Defines a callback that will be executed when channel is closed after
# channel-level exception.
#
# @api public
def on_error(&block)
super(&block)
end
# Defines a global callback to be run on channel-level exception across
# all channels. Consider using Channel#on_error instead. This method is here for sake
# of backwards compatibility with 0.6.x and 0.7.x releases.
# @see AMQP::Channel#on_error
# @deprecated
# @api public
def self.on_error(&block)
self.error(&block)
end # self.on_error(&block)
# @endgroup
#
# Implementation
#
# Defines a global callback to be run on channel-level exception across
# all channels. Consider using Channel#on_error instead. This method is here for sake
# of backwards compatibility with 0.6.x and 0.7.x releases.
#
# @param [String] msg Error message that passed to previously defined handler
#
# @deprecated
# @api public
# @private
def self.error(msg = nil, &block)
if block
@global_error_handler = block
else
@global_error_handler.call(msg) if @global_error_handler && msg
end
end
# Overrides AMQ::Client::Channel version to also call global callback
# (if defined) for backwards compatibility.
#
# @private
# @api private
def handle_close(method)
super(method)
self.class.error(method.reply_text)
self.class.release_channel_id(@id)
end
# Overrides AMQ::Client::Channel version to also release the channel id
#
# @private
# @api private
def handle_close_ok(method)
super(method)
self.class.release_channel_id(@id)
end
# Resets channel state (for example, list of registered queue objects and so on).
#
# Most of the time, this method is not
# called by application code.
#
# @private
# @api plugin
def reset(&block)
# See AMQ::Client::Channel
self.reset_state!
# there is no way to reset a deferrable; we have to use a new instance. MK.
@channel_is_open_deferrable = AMQ::Client::EventMachineClient::Deferrable.new
@channel_is_open_deferrable.callback(&block)
@connection.on_connection do
@channel_is_open_deferrable.succeed
self.prefetch(@options[:prefetch], false) if @options[:prefetch]
end
end
# @private
# @api plugin
def reset_state!
super
@rpcs = Hash.new
end # reset_state!
# Overrides superclass method to also re-create @channel_is_open_deferrable
#
# @api plugin
# @private
def handle_connection_interruption(reason = nil)
super(reason)
self.class.release_channel_id(@id) unless auto_recovering?
@channel_is_open_deferrable = AMQ::Client::EventMachineClient::Deferrable.new
end
# @private
# @api private
def self.channel_id_mutex
@channel_id_mutex ||= Mutex.new
end
# Returns next available channel id. This method is thread safe.
#
# @return [Fixnum]
# @api public
# @see Channel.release_channel_id
# @see Channel.reset_channel_id_allocator
def self.next_channel_id
channel_id_mutex.synchronize do
self.initialize_channel_id_allocator
@int_allocator.allocate
end
end
# Releases previously allocated channel id. This method is thread safe.
#
# @param [Fixnum] Channel id to release
# @api public
# @see Channel.next_channel_id
# @see Channel.reset_channel_id_allocator
def self.release_channel_id(i)
channel_id_mutex.synchronize do
self.initialize_channel_id_allocator
@int_allocator.release(i)
end
end # self.release_channel_id(i)
# Resets channel allocator. This method is thread safe.
# @api public
# @see Channel.next_channel_id
# @see Channel.release_channel_id
def self.reset_channel_id_allocator
channel_id_mutex.synchronize do
initialize_channel_id_allocator
@int_allocator.reset
end
end # self.reset_channel_id_allocator
# @private
def self.initialize_channel_id_allocator
# TODO: ideally, this should be in agreement with agreed max number of channels of the connection,
# but it is possible that value either not yet available. MK.
max_channel = (1 << 16) - 1
@int_allocator ||= IntAllocator.new(1, max_channel)
end # self.initialize_channel_id_allocator
# @private
# @api plugin
def register_rpc(rpc)
raise ArgumentError, "argument is nil!" unless rpc
@rpcs[rpc.name] = rpc
end # register_rpc(rpc)
# @private
# @api plugin
def find_rpc(name)
@rpcs[name]
end
#
# Backwards compatibility with 0.6.x
#
# unique identifier of the default thread-local channel
# @deprecated
# @private
def self.id
Thread.current[:mq_id] ||= "#{`hostname`.strip}-#{Process.pid}-#{Thread.current.object_id}"
end
# @private
# @deprecated
def self.default
# TODO: clear this when connection is closed
Thread.current[:mq] ||= AMQP::Channel.new
end
# Allows for calls to all MQ instance methods. This implicitly calls
# AMQP::Channel.new so that a new channel is allocated for subsequent operations.
# @deprecated
def self.method_missing(meth, *args, &blk)
self.default.__send__(meth, *args, &blk)
end
protected
@private
def validate_parameters_match!(entity, parameters, type)
unless entity.opts.values_at(*@parameter_checks[type]) == parameters.values_at(*@parameter_checks[type]) || parameters[:passive]
raise AMQP::IncompatibleOptionsError.new(entity.name, entity.opts, parameters)
end
end # validate_parameters_match!(entity, parameters, type)
end # Channel
end # AMQP