Sha256: 73e1ea947a18325bf9a0e62392dc01a155d99f81472b1492acb7b3e00098ff66

Contents?: true

Size: 1.21 KB

Versions: 1

Compression:

Stored size: 1.21 KB

Contents

module Asynchronic
  module QueueEngine
    class Ost
      
      attr_reader :default_queue

      def initialize(options={})
        ::Ost.connect options[:redis] if options.key?(:redis)
        @default_queue = options.fetch(:default_queue, Asynchronic.default_queue)
        @queues ||= Hash.new { |h,k| h[k] = Queue.new k }
      end

      def [](name)
        @queues[name]
      end

      def queues
        (@queues.values.map(&:key) | redis.keys('ost:*')).map { |q| q.to_s[4..-1].to_sym }
      end

      def clear
        @queues.clear
        redis.keys('ost:*').each { |k| redis.del k }
      end

      def listener
        Listener.new
      end

      private

      def redis
        @redis ||= Redis.connect(::Ost.options)
      end


      class Queue < ::Ost::Queue

        def pop
          key.rpop
        end

        def empty?
          !redis.exists(key)
        end

        def size
          items.count
        end

        def to_a
          items.reverse
        end

      end


      class Listener

        def listen(queue, &block)
          @current_queue = queue
          queue.each &block
        end

        def stop
          @current_queue.stop
        end

      end

    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
asynchronic-0.1.0 lib/asynchronic/queue_engine/ost.rb