deliver/lib/deliver/queue_worker.rb in fastlane-2.163.0 vs deliver/lib/deliver/queue_worker.rb in fastlane-2.164.0
- old
+ new
@@ -4,61 +4,46 @@
# This dispatches jobs to worker threads and make it work in parallel.
# It's suitable for I/O bounds works and not for CPU bounds works.
# Use this when you have all the items that you'll process in advance.
# Simply enqueue them to this and call `QueueWorker#start`.
class QueueWorker
+ NUMBER_OF_THREADS = Helper.test? ? 1 : [ENV.fetch("DELIVER_NUMBER_OF_THREADS", 10).to_i, 10].min
+
# @param concurrency (Numeric) - A number of threads to be created
# @param block (Proc) - A task you want to execute with enqueued items
- def initialize(concurrency, &block)
+ def initialize(concurrency = NUMBER_OF_THREADS, &block)
@concurrency = concurrency
@block = block
@queue = Queue.new
end
# @param job (Object) - An arbitary object that keeps parameters
def enqueue(job)
@queue.push(job)
end
+ # @param jobs (Array<Object>) - An array of arbitary object that keeps parameters
+ def batch_enqueue(jobs)
+ raise(ArgumentError, "Enqueue Array instead of #{jobs.class}") unless jobs.kind_of?(Array)
+ jobs.each { |job| enqueue(job) }
+ end
+
# Call this after you enqueuned all the jobs you want to process
# This method blocks current thread until all the enqueued jobs are processed
def start
+ @queue.close
+
threads = []
@concurrency.times do
threads << Thread.new do
- while running? && !empty?
+ job = @queue.pop
+ while job
+ @block.call(job)
job = @queue.pop
- @block.call(job) if job
end
end
end
- wait_for_complete
threads.each(&:join)
- end
-
- private
-
- def running?
- !@queue.closed?
- end
-
- def empty?
- @queue.empty?
- end
-
- def wait_for_complete
- wait_thread = Thread.new do
- loop do
- if @queue.empty?
- @queue.close
- break
- end
-
- sleep(1)
- end
- end
-
- wait_thread.join
end
end
end