Sha256: 4645064d557601215db73debc66479d88886d6d10dc19d08b1d2653323657d12

Contents?: true

Size: 1.34 KB

Versions: 6

Compression:

Stored size: 1.34 KB

Contents

module Asynchronic
  module QueueEngine
    class InMemory

      attr_reader :default_queue

      def initialize(options={})
        @default_queue = options[:default_queue]
        @queues ||= Hash.new { |h,k| h[k] = Queue.new }
      end

      def default_queue
        @default_queue ||= Asynchronic.default_queue
      end

      def [](name)
        @queues[name]
      end

      def queues
        @queues.keys.map(&:to_sym)
      end

      def clear
        @queues.clear
      end

      def listener
        Listener.new
      end

      def asynchronic?
        true
      end

      def active_connections
        [Asynchronic.connection_name]
      end


      class Queue

        extend Forwardable

        def_delegators :@queue, :size, :empty?, :to_a

        def initialize
          @queue = []
          @mutex = Mutex.new
        end

        def pop
          @mutex.synchronize { @queue.shift }
        end

        def push(message)
          @mutex.synchronize { @queue.push message }
        end

      end


      class Listener

        def listen(queue, &block)
          @stopping = false

          loop do
            break if @stopping
            item = queue.pop
            next unless item
            block.call item
          end
        end

        def stop
          @stopping = true
        end

      end

    end
  end
end

Version data entries

6 entries across 6 versions & 1 rubygems

Version Path
asynchronic-3.0.3 lib/asynchronic/queue_engine/in_memory.rb
asynchronic-3.0.2 lib/asynchronic/queue_engine/in_memory.rb
asynchronic-3.0.1 lib/asynchronic/queue_engine/in_memory.rb
asynchronic-3.0.0 lib/asynchronic/queue_engine/in_memory.rb
asynchronic-2.0.1 lib/asynchronic/queue_engine/in_memory.rb
asynchronic-2.0.0 lib/asynchronic/queue_engine/in_memory.rb