Sha256: 6a176b7d71dae669d2dab309e6a1d10afe094dcaff9ea770aa0ef1cbb3ece718
Contents?: true
Size: 1.26 KB
Versions: 3
Compression:
Stored size: 1.26 KB
Contents
module Asynchronic module QueueEngine class Ost attr_reader :default_queue def initialize(options={}) ::Ost.connect options[:redis] if options.key?(:redis) @default_queue = options[:default_queue] @queues ||= Hash.new { |h,k| h[k] = Queue.new k } end def default_queue @default_queue ||= Asynchronic.default_queue end def [](name) @queues[name] end def queues (@queues.values.map(&:key) | redis.keys('ost:*')).map { |q| q.to_s[4..-1].to_sym } end def clear @queues.clear redis.keys('ost:*').each { |k| redis.del k } end def listener Listener.new end private def redis @redis ||= Redis.connect(::Ost.options) end class Queue < ::Ost::Queue def pop key.rpop end def empty? !redis.exists(key) end def size items.count end def to_a items.reverse end end class Listener def listen(queue, &block) @current_queue = queue queue.each &block end def stop @current_queue.stop end end end end end
Version data entries
3 entries across 3 versions & 1 rubygems
Version | Path |
---|---|
asynchronic-0.2.2 | lib/asynchronic/queue_engine/ost.rb |
asynchronic-0.2.1 | lib/asynchronic/queue_engine/ost.rb |
asynchronic-0.2.0 | lib/asynchronic/queue_engine/ost.rb |