Sha256: 7fd8754f6b64ce01aa7bbe15eefd2f3b958507d033b03a9207bbd7e5c0c81aaf

Contents?: true

Size: 1.6 KB

Versions: 1

Compression:

Stored size: 1.6 KB

Contents

require 'bunny'

module Ears
  class Consumer < Bunny::Consumer
    class InvalidReturnError < StandardError
      def initialize(value)
        super(
          "#work must return :ack, :reject or :requeue, received #{value.inspect} instead",
        )
      end
    end

    def self.middlewares
      @middlewares ||= []
    end

    def self.use(middleware)
      middlewares << middleware
    end

    def work(delivery_info, metadata, payload)
      raise NotImplementedError
    end

    def process_delivery(delivery_info, metadata, payload)
      self.class.middlewares.reverse.reduce(
        work_proc,
      ) do |next_middleware, middleware|
        nest_middleware(middleware, next_middleware)
      end.call(delivery_info, metadata, payload)
    end

    protected

    def ack
      :ack
    end

    def reject
      :reject
    end

    def requeue
      :requeue
    end

    private

    def work_proc
      ->(delivery_info, metadata, payload) do
        work(delivery_info, metadata, payload).tap do |result|
          process_result(result, delivery_info.delivery_tag)
        end
      end
    end

    def nest_middleware(middleware, next_middleware)
      ->(delivery_info, metadata, payload) do
        middleware.call(delivery_info, metadata, payload, next_middleware)
      end
    end

    def process_result(result, delivery_tag)
      case result
      when :ack
        channel.ack(delivery_tag, false)
      when :reject
        channel.reject(delivery_tag)
      when :requeue
        channel.reject(delivery_tag, true)
      else
        raise InvalidReturnError, result
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
ears-0.1.0 lib/ears/consumer.rb