Sha256: 6a176b7d71dae669d2dab309e6a1d10afe094dcaff9ea770aa0ef1cbb3ece718

Contents?: true

Size: 1.26 KB

Versions: 3

Compression:

Stored size: 1.26 KB

Contents

module Asynchronic
  module QueueEngine
    class Ost
      
      attr_reader :default_queue

      def initialize(options={})
        ::Ost.connect options[:redis] if options.key?(:redis)
        @default_queue = options[:default_queue]
        @queues ||= Hash.new { |h,k| h[k] = Queue.new k }
      end

      def default_queue
        @default_queue ||= Asynchronic.default_queue
      end

      def [](name)
        @queues[name]
      end

      def queues
        (@queues.values.map(&:key) | redis.keys('ost:*')).map { |q| q.to_s[4..-1].to_sym }
      end

      def clear
        @queues.clear
        redis.keys('ost:*').each { |k| redis.del k }
      end

      def listener
        Listener.new
      end

      private

      def redis
        @redis ||= Redis.connect(::Ost.options)
      end


      class Queue < ::Ost::Queue

        def pop
          key.rpop
        end

        def empty?
          !redis.exists(key)
        end

        def size
          items.count
        end

        def to_a
          items.reverse
        end

      end


      class Listener

        def listen(queue, &block)
          @current_queue = queue
          queue.each &block
        end

        def stop
          @current_queue.stop
        end

      end

    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
asynchronic-0.2.2 lib/asynchronic/queue_engine/ost.rb
asynchronic-0.2.1 lib/asynchronic/queue_engine/ost.rb
asynchronic-0.2.0 lib/asynchronic/queue_engine/ost.rb