Sha256: 40e6329be699ec90afe996b71d835ac3cb8703ee89e4da7d38c96bb5cd5da702
Contents?: true
Size: 1.74 KB
Versions: 3
Compression:
Stored size: 1.74 KB
Contents
module Queuel module Base class Queue extend Introspect def initialize(client, queue_name) self.client = client self.name = queue_name end def peek(options = {}) raise NotImplementedError, "must implement #peek" end def push(message, options = {}) raise NotImplementedError, "must implement #push" end def pop(options = {}, &block) message_options, engine_options = Queuel::Hash.new(options).partition { |(k,_)| message_option_keys.include? k.to_s } bare_message = pop_bare_message(engine_options) unless bare_message.nil? build_new_message(bare_message, message_options).tap { |message| if block_given? && message.present? message.delete if yield(message) end } end end def receive(options = {}, &block) poller_klass.new(self, block, options, thread_count).poll end private attr_accessor :client attr_accessor :name def message_option_keys %w[encode encoder decode decoder] end def build_push_message(message, options = {}) message_klass.new(nil, options).tap { |m| m.body = message }.raw_body end def thread_count Queuel.receiver_threads || 1 end def pop_bare_message(options = {}) raise NotImplementedError, "must implement bare Message getter" end def build_new_message(bare_message, options = {}) message_klass.new(bare_message, options) end def message_klass self.class.const_with_nesting("Message") end def poller_klass self.class.const_with_nesting("Poller") end end end end
Version data entries
3 entries across 3 versions & 1 rubygems
Version | Path |
---|---|
queuel-0.3.1 | lib/queuel/base/queue.rb |
queuel-0.3.0 | lib/queuel/base/queue.rb |
queuel-0.2.0 | lib/queuel/base/queue.rb |