Class: Isimud::BunnyClient

Inherits:
Client
  • Object
show all
Defined in:
lib/isimud/bunny_client.rb

Overview

Interface for Bunny RabbitMQ client

See Also:

Constant Summary

DEFAULT_URL =
'amqp://guest:guest@localhost'
CHANNEL_KEY =
isimud.bunny_client.channel

Instance Attribute Summary (collapse)

Instance Method Summary (collapse)

Methods inherited from Client

#on_exception, #run_exception_handlers

Methods included from Logging

#log, #logger

Constructor Details

- (BunnyClient) initialize(_url = nil, _bunny_options = {})

Initialize a new BunnyClient instance. Note that a connection is not established until any other method is called

Parameters:

  • _url (String, Hash) (defaults to: nil)

    Server URL or options hash

  • _bunny_options (Hash) (defaults to: {})

    optional Bunny connection options

See Also:

  • for connection options


18
19
20
21
22
# File 'lib/isimud/bunny_client.rb', line 18

def initialize(_url = nil, _bunny_options = {})
  log "Isimud::BunnyClient.initialize: options = #{_bunny_options.inspect}"
  @url           = _url || DEFAULT_URL
  @bunny_options = _bunny_options
end

Instance Attribute Details

- (Object) url (readonly)

Returns the value of attribute url



11
12
13
# File 'lib/isimud/bunny_client.rb', line 11

def url
  @url
end

Instance Method Details

- (Bunny::Consumer) bind(queue_name, exchange_name, *routing_keys) {|payload| ... }

Convenience method that finds or creates a named queue, binds to an exchange, and subscribes to messages. If a block is provided, it will be called by the consumer each time a message is received.

Parameters:

  • queue_name (String)

    name of the queue

  • exchange_name (String)

    name of the AMQP exchange. Note that existing exchanges must be declared as Topic exchanges; otherwise, an error will occur

  • routing_keys (Array<String>)

    list of routing keys to be bound to the queue for the specified exchange.

Yield Parameters:

  • payload (String)

    message text

Returns:

  • (Bunny::Consumer)

    Bunny consumer interface



33
34
35
36
37
38
# File 'lib/isimud/bunny_client.rb', line 33

def bind(queue_name, exchange_name, *routing_keys, &block)
  queue = create_queue(queue_name, exchange_name,
                       queue_options: {durable: true},
                       routing_keys:  routing_keys)
  subscribe(queue, &block) if block_given?
end

- (Bunny::Channel) channel

Open a new, thread-specific AMQP connection channel, or return the current channel for this thread if it exists

and is currently open. New channels are created with publisher confirms enabled. Messages will be prefetched
according to Isimud.prefetch_count when declared.

Returns:

  • (Bunny::Channel)

    channel instance.



107
108
109
110
111
112
113
114
115
116
# File 'lib/isimud/bunny_client.rb', line 107

def channel
  if (channel = Thread.current[CHANNEL_KEY]).try(:open?)
    channel
  else
    new_channel = connection.channel
    new_channel.confirm_select
    new_channel.prefetch(Isimud.prefetch_count) if Isimud.prefetch_count
    Thread.current[CHANNEL_KEY] = new_channel
  end
end

- (Object) close

Close the AMQP connection and clear it from the instance.

Returns:

  • nil



132
133
134
135
136
# File 'lib/isimud/bunny_client.rb', line 132

def close
  connection.close
ensure
  @connection = nil
end

- (Boolean?) connected?

Determine if a Bunny connection is currently established to the AMQP server. but is closed or closing, or nil if no connection has been established.

Returns:

  • (Boolean, nil)

    true if a connection was established and is active or starting, false if a connection exists



126
127
128
# File 'lib/isimud/bunny_client.rb', line 126

def connected?
  @connection && @connection.open?
end

- (Bunny::Session) connection Also known as: connect

Establish a connection to the AMQP server, or return the current connection if one already exists

Returns:

  • (Bunny::Session)


95
96
97
# File 'lib/isimud/bunny_client.rb', line 95

def connection
  @connection ||= ::Bunny.new(url, @bunny_options).tap(&:start)
end

- (Bunny::Queue) create_queue(queue_name, exchange_name, options = {})

Find or create a named queue and bind it to the specified exchange

Parameters:

  • queue_name (String)

    name of the queue

  • exchange_name (String)

    name of the AMQP exchange. Note that pre-existing exchanges must be declared as Topic exchanges; otherwise, an error will occur

  • options (Hash) (defaults to: {})

    queue declaration options

Options Hash (options):

  • :queue_options (Boolean) — default: {durable: true}

    queue declaration options – @see Bunny::Channel#queue

  • :routing_keys (Array<String>) — default: []

    routing keys to be bound to the queue. Use “*” to match any 1 word in a route segment. Use “#” to match 0 or more words in a segment.

Returns:

  • (Bunny::Queue)

    Bunny queue



50
51
52
53
54
55
56
57
# File 'lib/isimud/bunny_client.rb', line 50

def create_queue(queue_name, exchange_name, options = {})
  queue_options = options[:queue_options] || {durable: true}
  routing_keys  = options[:routing_keys] || []
  log "Isimud::BunnyClient: create_queue #{queue_name}: queue_options=#{queue_options.inspect}"
  queue = find_queue(queue_name, queue_options)
  bind_routing_keys(queue, exchange_name, routing_keys) if routing_keys.any?
  queue
end

- (AMQ::Protocol::Queue::DeleteOk) delete_queue(queue_name)

Permanently delete the queue from the AMQP server. Any messages present in the queue will be discarded.

Parameters:

  • queue_name (String)

    queue name

Returns:

  • (AMQ::Protocol::Queue::DeleteOk)

    RabbitMQ response



89
90
91
# File 'lib/isimud/bunny_client.rb', line 89

def delete_queue(queue_name)
  channel.queue_delete(queue_name)
end

- (Object) find_queue(queue_name, options = {durable: true})

Look up a queue by name, or create it if it does not already exist.



156
157
158
# File 'lib/isimud/bunny_client.rb', line 156

def find_queue(queue_name, options = {durable: true})
  channel.queue(queue_name, options)
end

- (Object) publish(exchange, routing_key, payload)

Publish a message to the specified exchange, which is declared as a durable, topic exchange. Note that message

is always persisted.

Parameters:

  • exchange (String)

    AMQP exchange name

  • routing_key (String)

    message routing key. This should always be in the form of words separated by dots e.g. “user.goal.complete”

See Also:



144
145
146
# File 'lib/isimud/bunny_client.rb', line 144

def publish(exchange, routing_key, payload)
  channel.topic(exchange, durable: true).publish(payload, routing_key: routing_key, persistent: true)
end

- (Bunny::Session) reconnect

Close and reopen the AMQP connection

Returns:

  • (Bunny::Session)


150
151
152
153
# File 'lib/isimud/bunny_client.rb', line 150

def reconnect
  close
  connect
end

- (Object) reset

Reset this client by closing all channels for the connection.



119
120
121
# File 'lib/isimud/bunny_client.rb', line 119

def reset
  connection.close_all_channels
end

- (Object) subscribe(queue, options = {manual_ack: true}) {|payload| ... }

Subscribe to messages on the Bunny queue. The provided block will be called each time a message is received.

The message will be acknowledged and deleted from the queue unless an exception is raised from the block.
In the case that an exception is caught, the message is rejected, and any declared exception handlers will
be called.

Parameters:

  • queue (Bunny::Queue)

    Bunny queue

  • options (Hash) (defaults to: {manual_ack: true})

    true subscription options – @see Bunny::Queue#subscribe

Yield Parameters:

  • payload (String)

    message text



67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
# File 'lib/isimud/bunny_client.rb', line 67

def subscribe(queue, options = {manual_ack: true}, &block)
  current_channel = channel
  queue.subscribe(options) do |delivery_info, properties, payload|
    begin
      log "Isimud: queue #{queue.name} received #{delivery_info.delivery_tag} routing_key: #{delivery_info.routing_key}"
      Thread.current['isimud_queue_name']    = queue.name
      Thread.current['isimud_delivery_info'] = delivery_info
      Thread.current['isimud_properties']    = properties
      block.call(payload)
      log "Isimud: queue #{queue.name} finished with #{delivery_info.delivery_tag}, acknowledging"
      current_channel.ack(delivery_info.delivery_tag)
    rescue => e
      log("Isimud: queue #{queue.name} error processing #{delivery_info.delivery_tag} payload #{payload.inspect}: #{e.class.name} #{e.message}\n  #{e.backtrace.join("\n  ")}", :warn)
      current_channel.reject(delivery_info.delivery_tag, Isimud.retry_failures)
      run_exception_handlers(e)
    end
  end
end