deliver/lib/deliver/queue_worker.rb in fastlane-2.163.0 vs deliver/lib/deliver/queue_worker.rb in fastlane-2.164.0

- old
+ new

@@ -4,61 +4,46 @@ # This dispatches jobs to worker threads and make it work in parallel. # It's suitable for I/O bounds works and not for CPU bounds works. # Use this when you have all the items that you'll process in advance. # Simply enqueue them to this and call `QueueWorker#start`. class QueueWorker + NUMBER_OF_THREADS = Helper.test? ? 1 : [ENV.fetch("DELIVER_NUMBER_OF_THREADS", 10).to_i, 10].min + # @param concurrency (Numeric) - A number of threads to be created # @param block (Proc) - A task you want to execute with enqueued items - def initialize(concurrency, &block) + def initialize(concurrency = NUMBER_OF_THREADS, &block) @concurrency = concurrency @block = block @queue = Queue.new end # @param job (Object) - An arbitary object that keeps parameters def enqueue(job) @queue.push(job) end + # @param jobs (Array<Object>) - An array of arbitary object that keeps parameters + def batch_enqueue(jobs) + raise(ArgumentError, "Enqueue Array instead of #{jobs.class}") unless jobs.kind_of?(Array) + jobs.each { |job| enqueue(job) } + end + # Call this after you enqueuned all the jobs you want to process # This method blocks current thread until all the enqueued jobs are processed def start + @queue.close + threads = [] @concurrency.times do threads << Thread.new do - while running? && !empty? + job = @queue.pop + while job + @block.call(job) job = @queue.pop - @block.call(job) if job end end end - wait_for_complete threads.each(&:join) - end - - private - - def running? - !@queue.closed? - end - - def empty? - @queue.empty? - end - - def wait_for_complete - wait_thread = Thread.new do - loop do - if @queue.empty? - @queue.close - break - end - - sleep(1) - end - end - - wait_thread.join end end end