Sha256: 628d1f800fe1dc47b22c2c8ce7ad552a5f2d266c367976cdf707824eb0d2182a
Contents?: true
Size: 1.32 KB
Versions: 1
Compression:
Stored size: 1.32 KB
Contents
module Queuel module Base class Queue extend Introspect def initialize(client, queue_name) self.client = client self.name = queue_name end def peek(options = {}) raise NotImplementedError, "must implement #peek" end def push(message) raise NotImplementedError, "must implement #push" end def pop(options = {}, &block) bare_message = pop_bare_message(options) unless bare_message.nil? build_new_message(bare_message).tap { |message| if block_given? && message.present? message.delete if yield(message) end } end end def receive(options = {}, &block) poller_klass.new(self, block, options, thread_count).poll end private attr_accessor :client attr_accessor :name def thread_count Queuel.receiver_threads || 1 end def pop_bare_message(options = {}) raise NotImplementedError, "must implement bare Message getter" end def build_new_message(bare_message) message_klass.new(bare_message) end def message_klass self.class.const_with_nesting("Message") end def poller_klass self.class.const_with_nesting("Poller") end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
queuel-0.1.0 | lib/queuel/base/queue.rb |