Sha256: 73e1ea947a18325bf9a0e62392dc01a155d99f81472b1492acb7b3e00098ff66
Contents?: true
Size: 1.21 KB
Versions: 1
Compression:
Stored size: 1.21 KB
Contents
module Asynchronic module QueueEngine class Ost attr_reader :default_queue def initialize(options={}) ::Ost.connect options[:redis] if options.key?(:redis) @default_queue = options.fetch(:default_queue, Asynchronic.default_queue) @queues ||= Hash.new { |h,k| h[k] = Queue.new k } end def [](name) @queues[name] end def queues (@queues.values.map(&:key) | redis.keys('ost:*')).map { |q| q.to_s[4..-1].to_sym } end def clear @queues.clear redis.keys('ost:*').each { |k| redis.del k } end def listener Listener.new end private def redis @redis ||= Redis.connect(::Ost.options) end class Queue < ::Ost::Queue def pop key.rpop end def empty? !redis.exists(key) end def size items.count end def to_a items.reverse end end class Listener def listen(queue, &block) @current_queue = queue queue.each &block end def stop @current_queue.stop end end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
asynchronic-0.1.0 | lib/asynchronic/queue_engine/ost.rb |