lib/ears/consumer.rb in ears-0.3.1 vs lib/ears/consumer.rb in ears-0.3.2
- old
+ new
@@ -1,46 +1,79 @@
require 'bunny'
module Ears
+ # The abstract base class for consumers processing messages from queues.
+ # @abstract Subclass and override {#work} to implement.
class Consumer
+ # Error that is raised when an invalid value is returned from {#work}
class InvalidReturnError < StandardError
def initialize(value)
super(
"#work must return :ack, :reject or :requeue, received #{value.inspect} instead",
)
end
end
+ # List of registered middlewares. Register new middlewares with {.use}.
+ # @return [Array<Ears::Middleware>]
def self.middlewares
@middlewares ||= []
end
+ # Registers a new middleware by instantiating +middleware+ and passing it +opts+.
+ #
+ # @param [Class<Ears::Middleware>] middleware The middleware class to instantiate and register.
+ # @param [Hash] opts The options for instantiating the middleware.
def self.use(middleware, opts = {})
middlewares << middleware.new(opts)
end
+ # The method that is called when a message from the queue is received.
+ # Keep in mind that the parameters received can be altered by middlewares!
+ #
+ # @param [Bunny::DeliveryInfo] delivery_info The delivery info of the message.
+ # @param [Bunny::MessageProperties] metadata The metadata of the message.
+ # @param [String] payload The payload of the message.
+ #
+ # @return [:ack, :reject, :requeue] A symbol denoting what should be done with the message.
def work(delivery_info, metadata, payload)
raise NotImplementedError
end
+ # Wraps #work to add middlewares. This is being called by Ears when a message is received for the consumer.
+ #
+ # @param [Bunny::DeliveryInfo] delivery_info The delivery info of the received message.
+ # @param [Bunny::MessageProperties] metadata The metadata of the received message.
+ # @param [String] payload The payload of the received message.
+ # @raise [InvalidReturnError] if you return something other than +:ack+, +:reject+ or +:requeue+ from {#work}.
def process_delivery(delivery_info, metadata, payload)
self.class.middlewares.reverse.reduce(
work_proc,
) do |next_middleware, middleware|
nest_middleware(middleware, next_middleware)
end.call(delivery_info, metadata, payload)
end
protected
+ # Helper method to ack a message.
+ #
+ # @return [:ack]
def ack
:ack
end
+ # Helper method to reject a message.
+ #
+ # @return [:reject]
+ #
def reject
:reject
end
+ # Helper method to requeue a message.
+ #
+ # @return [:requeue]
def requeue
:requeue
end
private