lib/zold/thread_pool.rb in zold-0.18.8 vs lib/zold/thread_pool.rb in zold-0.18.9

- old
+ new

@@ -42,28 +42,34 @@ def run(threads, set = (0..threads - 1).to_a) raise "Number of threads #{threads} has to be positive" unless threads.positive? idx = Concurrent::AtomicFixnum.new mutex = Mutex.new list = set.dup - [threads, set.count].min.times do + total = [threads, set.count].min + latch = Concurrent::CountDownLatch.new(total) + total.times do |i| add do + Thread.current.name = "#{@title}-#{i}" loop do r = mutex.synchronize { list.pop } break if r.nil? yield(r, idx.increment - 1) end + ensure + latch.count_down end end - @threads.each(&:join) - @threads.clear + latch.wait + kill end # Add a new thread def add raise 'Block must be given to start()' unless block_given? latch = Concurrent::CountDownLatch.new(1) thread = Thread.start do + Thread.current.name = @title VerboseThread.new(@log).run do latch.count_down yield end end @@ -73,25 +79,32 @@ (Thread.current.thread_variable_get(:kids) || []) + [thread] ) @threads << thread end + def join(sec) + @threads.each { |t| t.join(sec) } + end + # Kill them all immediately and close the pool def kill if @threads.empty? @log.debug("Thread pool \"#{@title}\" terminated with no threads") return end - @log.info("Stopping \"#{@title}\" thread pool with #{@threads.count} threads: \ + @log.debug("Stopping \"#{@title}\" thread pool with #{@threads.count} threads: \ #{@threads.map { |t| "#{t.name}/#{t.status}" }.join(', ')}...") start = Time.new @threads.each do |t| (t.thread_variable_get(:kids) || []).each(&:kill) t.kill raise "Failed to join the thread \"#{t.name}\" in \"#{@title}\" pool" unless t.join(0.1) - (Thread.current.thread_variable_get(:kids) || []).delete(t) + Thread.current.thread_variable_set( + :kids, + (Thread.current.thread_variable_get(:kids) || []) - [t] + ) end - @log.info("Thread pool \"#{@title}\" terminated all threads in #{Age.new(start)}, \ + @log.debug("Thread pool \"#{@title}\" terminated all threads in #{Age.new(start)}, \ it was alive for #{Age.new(@start)}: #{@threads.map { |t| "#{t.name}/#{t.status}" }.join(', ')}") @threads.clear end # How many threads are in there