Sha256: 40e6329be699ec90afe996b71d835ac3cb8703ee89e4da7d38c96bb5cd5da702

Contents?: true

Size: 1.74 KB

Versions: 3

Compression:

Stored size: 1.74 KB

Contents

module Queuel
  module Base
    class Queue
      extend Introspect

      def initialize(client, queue_name)
        self.client = client
        self.name = queue_name
      end

      def peek(options = {})
        raise NotImplementedError, "must implement #peek"
      end

      def push(message, options = {})
        raise NotImplementedError, "must implement #push"
      end

      def pop(options = {}, &block)
        message_options, engine_options = Queuel::Hash.new(options).partition { |(k,_)| message_option_keys.include? k.to_s }
        bare_message = pop_bare_message(engine_options)
        unless bare_message.nil?
          build_new_message(bare_message, message_options).tap { |message|
            if block_given? && message.present?
              message.delete if yield(message)
            end
          }
        end
      end

      def receive(options = {}, &block)
        poller_klass.new(self, block, options, thread_count).poll
      end

      private
      attr_accessor :client
      attr_accessor :name

      def message_option_keys
        %w[encode encoder decode decoder]
      end

      def build_push_message(message, options = {})
        message_klass.new(nil, options).tap { |m|
          m.body = message
        }.raw_body
      end

      def thread_count
        Queuel.receiver_threads || 1
      end

      def pop_bare_message(options = {})
        raise NotImplementedError, "must implement bare Message getter"
      end

      def build_new_message(bare_message, options = {})
        message_klass.new(bare_message, options)
      end

      def message_klass
        self.class.const_with_nesting("Message")
      end

      def poller_klass
        self.class.const_with_nesting("Poller")
      end
    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
queuel-0.3.1 lib/queuel/base/queue.rb
queuel-0.3.0 lib/queuel/base/queue.rb
queuel-0.2.0 lib/queuel/base/queue.rb