Sha256: 628d1f800fe1dc47b22c2c8ce7ad552a5f2d266c367976cdf707824eb0d2182a

Contents?: true

Size: 1.32 KB

Versions: 1

Compression:

Stored size: 1.32 KB

Contents

module Queuel
  module Base
    class Queue
      extend Introspect

      def initialize(client, queue_name)
        self.client = client
        self.name = queue_name
      end

      def peek(options = {})
        raise NotImplementedError, "must implement #peek"
      end

      def push(message)
        raise NotImplementedError, "must implement #push"
      end

      def pop(options = {}, &block)
        bare_message = pop_bare_message(options)
        unless bare_message.nil?
          build_new_message(bare_message).tap { |message|
            if block_given? && message.present?
              message.delete if yield(message)
            end
          }
        end
      end

      def receive(options = {}, &block)
        poller_klass.new(self, block, options, thread_count).poll
      end

      private
      attr_accessor :client
      attr_accessor :name

      def thread_count
        Queuel.receiver_threads || 1
      end

      def pop_bare_message(options = {})
        raise NotImplementedError, "must implement bare Message getter"
      end

      def build_new_message(bare_message)
        message_klass.new(bare_message)
      end

      def message_klass
        self.class.const_with_nesting("Message")
      end

      def poller_klass
        self.class.const_with_nesting("Poller")
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
queuel-0.1.0 lib/queuel/base/queue.rb