lib/daybreak/queue.rb in daybreak-0.2.0 vs lib/daybreak/queue.rb in daybreak-0.2.1

- old
+ new

@@ -1,107 +1,5 @@ -module Daybreak - # Thread safe job queue - # @api private - class Queue - # HACK: Dangerous optimization on MRI which has a - # global interpreter lock and makes the @queue array - # thread safe. - if !defined?(RUBY_ENGINE) || RUBY_ENGINE == 'ruby' - def initialize - @queue, @full, @empty = [], [], [] - @stop = false - @heartbeat = Thread.new(&method(:heartbeat)) - @heartbeat.priority = -9 - end - - def <<(x) - @queue << x - thread = @full.first - thread.wakeup if thread - end - - def pop - @queue.shift - if @queue.empty? - thread = @empty.first - thread.wakeup if thread - end - end - - def next - while @queue.empty? - begin - @full << Thread.current - # If a push happens before Thread.stop, the thread won't be woken up - Thread.stop while @queue.empty? - ensure - @full.delete(Thread.current) - end - end - @queue.first - end - - def flush - until @queue.empty? - begin - @empty << Thread.current - # If a pop happens before Thread.stop, the thread won't be woken up - Thread.stop until @queue.empty? - ensure - @empty.delete(Thread.current) - end - end - end - - def stop - @stop = true - @heartbeat.join - end - - private - - # Check threads 10 times per second to avoid deadlocks - # since there is a race condition below - def heartbeat - until @stop - @empty.each(&:wakeup) - @full.each(&:wakeup) - sleep 0.1 - end - end - else - def initialize - @mutex = Mutex.new - @full = ConditionVariable.new - @empty = ConditionVariable.new - @queue = [] - end - - def <<(x) - @mutex.synchronize do - @queue << x - @full.signal - end - end - - def pop - @mutex.synchronize do - @queue.shift - @empty.signal if @queue.empty? - end - end - - def next - @mutex.synchronize do - @full.wait(@mutex) while @queue.empty? - @queue.first - end - end - - def flush - @mutex.synchronize do - @empty.wait(@mutex) until @queue.empty? - end - end - end - end +if !defined?(RUBY_ENGINE) || RUBY_ENGINE == 'ruby' + require 'daybreak/queue/mri' +else + require 'daybreak/queue/threaded' end