Sha256: 7c802d1bca6c5b179776029116cc129147a9dd45faa6f3d644874b9bf054d43e

Contents?: true

Size: 1.16 KB

Versions: 1

Compression:

Stored size: 1.16 KB

Contents

module Asynchronic
  module QueueEngine
    class InMemory

      attr_reader :default_queue

      def initialize(options={})
        @default_queue = options.fetch(:default_queue, Asynchronic.default_queue)
        @queues ||= Hash.new { |h,k| h[k] = Queue.new }
      end

      def [](name)
        @queues[name]
      end

      def queues
        @queues.keys.map(&:to_sym)
      end

      def clear
        @queues.clear
      end

      def listener
        Listener.new
      end


      class Queue

        extend Forwardable

        def_delegators :@queue, :size, :empty?, :to_a

        def initialize
          @queue = []
          @mutex = Mutex.new
        end

        def pop
          @mutex.synchronize { @queue.shift }
        end

        def push(message)
          @mutex.synchronize { @queue.push message }
        end

      end


      class Listener

        def listen(queue, &block)
          @stopping = false

          loop do
            break if @stopping
            item = queue.pop
            next unless item
            block.call item
          end
        end

        def stop
          @stopping = true
        end

      end

    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
asynchronic-0.1.0 lib/asynchronic/queue_engine/in_memory.rb