lib/liveqa/processor/worker.rb in liveqa-1.9.0 vs lib/liveqa/processor/worker.rb in liveqa-1.9.1
- old
+ new
@@ -3,21 +3,22 @@
class Worker
DEFAULT_WAIT_TIME = 2
attr_reader :queue
- attr_reader :lock
attr_reader :batches
+ attr_reader :executor
- def initialize(queue)
+ def initialize(queue, executor)
@queue = queue
- @lock = Mutex.new
@batches = []
+
+ @executor = executor
end
def run
- until Thread.current[:should_exit]
+ while running?
return if queue.empty? && batches.empty?
sleep(DEFAULT_WAIT_TIME) if queue.empty?
create_new_batch unless queue.empty?
@@ -26,14 +27,17 @@
end
end
private
+ def running?
+ return true if !queue.empty? || !batches.empty?
+ executor.running?
+ end
+
def create_new_batch
batch = Batch.new
- lock.synchronize do
- batch << queue.pop until batch.full? || queue.empty?
- end
+ batch << queue.pop until batch.full? || queue.empty?
batches.push(batch)
end
def send_batches
batches.dup.each do