lib/zold/thread_pool.rb in zold-0.18.8 vs lib/zold/thread_pool.rb in zold-0.18.9
- old
+ new
@@ -42,28 +42,34 @@
def run(threads, set = (0..threads - 1).to_a)
raise "Number of threads #{threads} has to be positive" unless threads.positive?
idx = Concurrent::AtomicFixnum.new
mutex = Mutex.new
list = set.dup
- [threads, set.count].min.times do
+ total = [threads, set.count].min
+ latch = Concurrent::CountDownLatch.new(total)
+ total.times do |i|
add do
+ Thread.current.name = "#{@title}-#{i}"
loop do
r = mutex.synchronize { list.pop }
break if r.nil?
yield(r, idx.increment - 1)
end
+ ensure
+ latch.count_down
end
end
- @threads.each(&:join)
- @threads.clear
+ latch.wait
+ kill
end
# Add a new thread
def add
raise 'Block must be given to start()' unless block_given?
latch = Concurrent::CountDownLatch.new(1)
thread = Thread.start do
+ Thread.current.name = @title
VerboseThread.new(@log).run do
latch.count_down
yield
end
end
@@ -73,25 +79,32 @@
(Thread.current.thread_variable_get(:kids) || []) + [thread]
)
@threads << thread
end
+ def join(sec)
+ @threads.each { |t| t.join(sec) }
+ end
+
# Kill them all immediately and close the pool
def kill
if @threads.empty?
@log.debug("Thread pool \"#{@title}\" terminated with no threads")
return
end
- @log.info("Stopping \"#{@title}\" thread pool with #{@threads.count} threads: \
+ @log.debug("Stopping \"#{@title}\" thread pool with #{@threads.count} threads: \
#{@threads.map { |t| "#{t.name}/#{t.status}" }.join(', ')}...")
start = Time.new
@threads.each do |t|
(t.thread_variable_get(:kids) || []).each(&:kill)
t.kill
raise "Failed to join the thread \"#{t.name}\" in \"#{@title}\" pool" unless t.join(0.1)
- (Thread.current.thread_variable_get(:kids) || []).delete(t)
+ Thread.current.thread_variable_set(
+ :kids,
+ (Thread.current.thread_variable_get(:kids) || []) - [t]
+ )
end
- @log.info("Thread pool \"#{@title}\" terminated all threads in #{Age.new(start)}, \
+ @log.debug("Thread pool \"#{@title}\" terminated all threads in #{Age.new(start)}, \
it was alive for #{Age.new(@start)}: #{@threads.map { |t| "#{t.name}/#{t.status}" }.join(', ')}")
@threads.clear
end
# How many threads are in there