lib/liveqa/processor/worker.rb in liveqa-1.9.0 vs lib/liveqa/processor/worker.rb in liveqa-1.9.1

- old
+ new

@@ -3,21 +3,22 @@ class Worker DEFAULT_WAIT_TIME = 2 attr_reader :queue - attr_reader :lock attr_reader :batches + attr_reader :executor - def initialize(queue) + def initialize(queue, executor) @queue = queue - @lock = Mutex.new @batches = [] + + @executor = executor end def run - until Thread.current[:should_exit] + while running? return if queue.empty? && batches.empty? sleep(DEFAULT_WAIT_TIME) if queue.empty? create_new_batch unless queue.empty? @@ -26,14 +27,17 @@ end end private + def running? + return true if !queue.empty? || !batches.empty? + executor.running? + end + def create_new_batch batch = Batch.new - lock.synchronize do - batch << queue.pop until batch.full? || queue.empty? - end + batch << queue.pop until batch.full? || queue.empty? batches.push(batch) end def send_batches batches.dup.each do