Class: Isimud::BunnyClient
Overview
Interface for Bunny RabbitMQ client
Constant Summary
- DEFAULT_URL =
'amqp://guest:guest@localhost'
- CHANNEL_KEY =
isimud.bunny_client.channel
Instance Attribute Summary (collapse)
-
- (Object) url
readonly
Returns the value of attribute url.
Instance Method Summary (collapse)
-
- (Bunny::Consumer) bind(queue_name, exchange_name, *routing_keys) {|payload| ... }
Convenience method that finds or creates a named queue, binds to an exchange, and subscribes to messages.
-
- (Bunny::Channel) channel
Open a new, thread-specific AMQP connection channel, or return the current channel for this thread if it exists and is currently open.
-
- (Object) close
Close the AMQP connection and clear it from the instance.
-
- (Boolean?) connected?
Determine if a Bunny connection is currently established to the AMQP server.
-
- (Bunny::Session) connection
(also: #connect)
Establish a connection to the AMQP server, or return the current connection if one already exists.
-
- (Bunny::Queue) create_queue(queue_name, exchange_name, options = {})
Find or create a named queue and bind it to the specified exchange.
-
- (AMQ::Protocol::Queue::DeleteOk) delete_queue(queue_name)
Permanently delete the queue from the AMQP server.
-
- (Object) find_queue(queue_name, options = {durable: true})
Look up a queue by name, or create it if it does not already exist.
-
- (BunnyClient) initialize(_url = nil, _bunny_options = {})
constructor
Initialize a new BunnyClient instance.
-
- (Object) publish(exchange, routing_key, payload)
Publish a message to the specified exchange, which is declared as a durable, topic exchange.
-
- (Bunny::Session) reconnect
Close and reopen the AMQP connection.
-
- (Object) reset
Reset this client by closing all channels for the connection.
-
- (Object) subscribe(queue, options = {manual_ack: true}) {|payload| ... }
Subscribe to messages on the Bunny queue.
Methods inherited from Client
#on_exception, #run_exception_handlers
Methods included from Logging
Constructor Details
- (BunnyClient) initialize(_url = nil, _bunny_options = {})
Initialize a new BunnyClient instance. Note that a connection is not established until any other method is called
18 19 20 21 22 |
# File 'lib/isimud/bunny_client.rb', line 18 def initialize(_url = nil, = {}) log "Isimud::BunnyClient.initialize: options = #{.inspect}" @url = _url || DEFAULT_URL @bunny_options = end |
Instance Attribute Details
- (Object) url (readonly)
Returns the value of attribute url
11 12 13 |
# File 'lib/isimud/bunny_client.rb', line 11 def url @url end |
Instance Method Details
- (Bunny::Consumer) bind(queue_name, exchange_name, *routing_keys) {|payload| ... }
Convenience method that finds or creates a named queue, binds to an exchange, and subscribes to messages. If a block is provided, it will be called by the consumer each time a message is received.
33 34 35 36 37 38 |
# File 'lib/isimud/bunny_client.rb', line 33 def bind(queue_name, exchange_name, *routing_keys, &block) queue = create_queue(queue_name, exchange_name, queue_options: {durable: true}, routing_keys: routing_keys) subscribe(queue, &block) if block_given? end |
- (Bunny::Channel) channel
Open a new, thread-specific AMQP connection channel, or return the current channel for this thread if it exists
and is currently open. New channels are created with publisher confirms enabled. Messages will be prefetched
according to Isimud.prefetch_count when declared.
107 108 109 110 111 112 113 114 115 116 |
# File 'lib/isimud/bunny_client.rb', line 107 def channel if (channel = Thread.current[CHANNEL_KEY]).try(:open?) channel else new_channel = connection.channel new_channel.confirm_select new_channel.prefetch(Isimud.prefetch_count) if Isimud.prefetch_count Thread.current[CHANNEL_KEY] = new_channel end end |
- (Object) close
Close the AMQP connection and clear it from the instance.
132 133 134 135 136 |
# File 'lib/isimud/bunny_client.rb', line 132 def close connection.close ensure @connection = nil end |
- (Boolean?) connected?
Determine if a Bunny connection is currently established to the AMQP server. but is closed or closing, or nil if no connection has been established.
126 127 128 |
# File 'lib/isimud/bunny_client.rb', line 126 def connected? @connection && @connection.open? end |
- (Bunny::Session) connection Also known as: connect
Establish a connection to the AMQP server, or return the current connection if one already exists
95 96 97 |
# File 'lib/isimud/bunny_client.rb', line 95 def connection @connection ||= ::Bunny.new(url, @bunny_options).tap(&:start) end |
- (Bunny::Queue) create_queue(queue_name, exchange_name, options = {})
Find or create a named queue and bind it to the specified exchange
50 51 52 53 54 55 56 57 |
# File 'lib/isimud/bunny_client.rb', line 50 def create_queue(queue_name, exchange_name, = {}) = [:queue_options] || {durable: true} routing_keys = [:routing_keys] || [] log "Isimud::BunnyClient: create_queue #{queue_name}: queue_options=#{.inspect}" queue = find_queue(queue_name, ) bind_routing_keys(queue, exchange_name, routing_keys) if routing_keys.any? queue end |
- (AMQ::Protocol::Queue::DeleteOk) delete_queue(queue_name)
Permanently delete the queue from the AMQP server. Any messages present in the queue will be discarded.
89 90 91 |
# File 'lib/isimud/bunny_client.rb', line 89 def delete_queue(queue_name) channel.queue_delete(queue_name) end |
- (Object) find_queue(queue_name, options = {durable: true})
Look up a queue by name, or create it if it does not already exist.
156 157 158 |
# File 'lib/isimud/bunny_client.rb', line 156 def find_queue(queue_name, = {durable: true}) channel.queue(queue_name, ) end |
- (Object) publish(exchange, routing_key, payload)
Publish a message to the specified exchange, which is declared as a durable, topic exchange. Note that message
is always persisted.
144 145 146 |
# File 'lib/isimud/bunny_client.rb', line 144 def publish(exchange, routing_key, payload) channel.topic(exchange, durable: true).publish(payload, routing_key: routing_key, persistent: true) end |
- (Bunny::Session) reconnect
Close and reopen the AMQP connection
150 151 152 153 |
# File 'lib/isimud/bunny_client.rb', line 150 def reconnect close connect end |
- (Object) reset
Reset this client by closing all channels for the connection.
119 120 121 |
# File 'lib/isimud/bunny_client.rb', line 119 def reset connection.close_all_channels end |
- (Object) subscribe(queue, options = {manual_ack: true}) {|payload| ... }
Subscribe to messages on the Bunny queue. The provided block will be called each time a message is received.
The message will be acknowledged and deleted from the queue unless an exception is raised from the block.
In the case that an exception is caught, the message is rejected, and any declared exception handlers will
be called.
67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 |
# File 'lib/isimud/bunny_client.rb', line 67 def subscribe(queue, = {manual_ack: true}, &block) current_channel = channel queue.subscribe() do |delivery_info, properties, payload| begin log "Isimud: queue #{queue.name} received #{delivery_info.delivery_tag} routing_key: #{delivery_info.routing_key}" Thread.current['isimud_queue_name'] = queue.name Thread.current['isimud_delivery_info'] = delivery_info Thread.current['isimud_properties'] = properties block.call(payload) log "Isimud: queue #{queue.name} finished with #{delivery_info.delivery_tag}, acknowledging" current_channel.ack(delivery_info.delivery_tag) rescue => e log("Isimud: queue #{queue.name} error processing #{delivery_info.delivery_tag} payload #{payload.inspect}: #{e.class.name} #{e.}\n #{e.backtrace.join("\n ")}", :warn) current_channel.reject(delivery_info.delivery_tag, Isimud.retry_failures) run_exception_handlers(e) end end end |