lib/rainbows/thread_pool.rb in rainbows-0.6.0 vs lib/rainbows/thread_pool.rb in rainbows-0.7.0

- old
+ new

@@ -25,45 +25,47 @@ include Base def worker_loop(worker) init_worker_process(worker) - pool = (1..worker_connections).map { new_worker_thread } + pool = (1..worker_connections).map do + Thread.new { LISTENERS.size == 1 ? sync_worker : async_worker } + end while G.alive # if any worker dies, something is serious wrong, bail pool.each do |thr| - G.tick + G.tick or break thr.join(1) and G.quit! end end join_threads(pool) end - def new_worker_thread - Thread.new { - begin - begin - # TODO: check if select() or accept() is a problem on large - # SMP systems under Ruby 1.9. Hundreds of native threads - # all working off the same socket could be a thundering herd - # problem. On the other hand, a thundering herd may not - # even incur as much overhead as an extra Mutex#synchronize - ret = IO.select(LISTENERS, nil, nil, 1) and - ret.first.each do |sock| - begin - process_client(sock.accept_nonblock) - rescue Errno::EAGAIN, Errno::ECONNABORTED - end - end - rescue Errno::EINTR - rescue Errno::EBADF, TypeError - break - end - rescue Object => e - listen_loop_error(e) - end while G.alive - } + def sync_worker + s = LISTENERS.first + begin + process_client(s.accept) + rescue Errno::EINTR, Errno::ECONNABORTED + rescue => e + Error.listen_loop(e) + end while G.alive + end + + def async_worker + begin + # TODO: check if select() or accept() is a problem on large + # SMP systems under Ruby 1.9. Hundreds of native threads + # all working off the same socket could be a thundering herd + # problem. On the other hand, a thundering herd may not + # even incur as much overhead as an extra Mutex#synchronize + ret = IO.select(LISTENERS, nil, nil, 1) and ret.first.each do |s| + s = Rainbows.accept(s) and process_client(s) + end + rescue Errno::EINTR + rescue => e + Error.listen_loop(e) + end while G.alive end end end