Sha256: 7c802d1bca6c5b179776029116cc129147a9dd45faa6f3d644874b9bf054d43e
Contents?: true
Size: 1.16 KB
Versions: 1
Compression:
Stored size: 1.16 KB
Contents
module Asynchronic module QueueEngine class InMemory attr_reader :default_queue def initialize(options={}) @default_queue = options.fetch(:default_queue, Asynchronic.default_queue) @queues ||= Hash.new { |h,k| h[k] = Queue.new } end def [](name) @queues[name] end def queues @queues.keys.map(&:to_sym) end def clear @queues.clear end def listener Listener.new end class Queue extend Forwardable def_delegators :@queue, :size, :empty?, :to_a def initialize @queue = [] @mutex = Mutex.new end def pop @mutex.synchronize { @queue.shift } end def push(message) @mutex.synchronize { @queue.push message } end end class Listener def listen(queue, &block) @stopping = false loop do break if @stopping item = queue.pop next unless item block.call item end end def stop @stopping = true end end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
asynchronic-0.1.0 | lib/asynchronic/queue_engine/in_memory.rb |