Sha256: 68b5a6600be22b591bae2039405e2af7a33dad03661c57a893af7b570b033c76

Contents?: true

Size: 1.34 KB

Versions: 8

Compression:

Stored size: 1.34 KB

Contents

module Asynchronic
  module QueueEngine
    class Ost
      
      attr_reader :default_queue

      def initialize(options={})
        ::Ost.connect options[:redis] if options.key?(:redis)
        @default_queue = options[:default_queue]
        @queues ||= Hash.new { |h,k| h[k] = Queue.new k }
      end

      def default_queue
        @default_queue ||= Asynchronic.default_queue
      end

      def [](name)
        @queues[name]
      end

      def queues
        (@queues.values.map(&:key) | redis.keys('ost:*')).map { |q| q.to_s[4..-1].to_sym }
      end

      def clear
        @queues.clear
        redis.keys('ost:*').each { |k| redis.del k }
      end

      def listener
        Listener.new
      end

      private

      def redis
        @redis ||= Redis.connect(::Ost.options)
      end


      class Queue < ::Ost::Queue

        def pop
          key.rpop
        end

        def empty?
          !redis.exists(key)
        end

        def size
          items.count
        end

        def to_a
          items.reverse
        end

      end


      class Listener

        def listen(queue, &block)
          @current_queue = queue
          Asynchronic.retry_execution(self.class, 'listen') do
            queue.each &block
          end
        end

        def stop
          @current_queue.stop
        end

      end

    end
  end
end

Version data entries

8 entries across 8 versions & 1 rubygems

Version Path
asynchronic-1.2.1 lib/asynchronic/queue_engine/ost.rb
asynchronic-1.2.0 lib/asynchronic/queue_engine/ost.rb
asynchronic-1.1.1 lib/asynchronic/queue_engine/ost.rb
asynchronic-1.1.0 lib/asynchronic/queue_engine/ost.rb
asynchronic-1.0.0 lib/asynchronic/queue_engine/ost.rb
asynchronic-0.3.1 lib/asynchronic/queue_engine/ost.rb
asynchronic-0.3.0 lib/asynchronic/queue_engine/ost.rb
asynchronic-0.2.3 lib/asynchronic/queue_engine/ost.rb