lib/yahns/queue_epoll.rb in yahns-0.0.0 vs lib/yahns/queue_epoll.rb in yahns-0.0.1

- old
+ new

@@ -20,38 +20,36 @@ @fdmap.add(io) epoll_ctl(Epoll::CTL_ADD, io, flags) end # returns an array of infinitely running threads - def spawn_worker_threads(logger, worker_threads, max_events) - worker_threads.times do - Thread.new do - Thread.current[:yahns_rbuf] = "" - begin - epoll_wait(max_events) do |_, io| # don't care for flags for now - case rv = io.yahns_step - when :wait_readable - epoll_ctl(Epoll::CTL_MOD, io, QEV_RD) - when :wait_writable - epoll_ctl(Epoll::CTL_MOD, io, QEV_WR) - when :wait_readwrite - epoll_ctl(Epoll::CTL_MOD, io, QEV_RDWR) - when :delete # only used by rack.hijack - epoll_ctl(Epoll::CTL_DEL, io, 0) - @fdmap.delete(io) - when nil - # this is be the ONLY place where we call IO#close on - # things inside the queue - io.close - @fdmap.decr - else - raise "BUG: #{io.inspect}#yahns_step returned: #{rv.inspect}" - end + def worker_thread(logger, max_events) + Thread.new do + Thread.current[:yahns_rbuf] = "" + begin + epoll_wait(max_events) do |_, io| # don't care for flags for now + case rv = io.yahns_step + when :wait_readable + epoll_ctl(Epoll::CTL_MOD, io, QEV_RD) + when :wait_writable + epoll_ctl(Epoll::CTL_MOD, io, QEV_WR) + when :wait_readwrite + epoll_ctl(Epoll::CTL_MOD, io, QEV_RDWR) + when :ignore # only used by rack.hijack + @fdmap.decr + when nil + # this is be the ONLY place where we call IO#close on + # things inside the queue + io.close + @fdmap.decr + else + raise "BUG: #{io.inspect}#yahns_step returned: #{rv.inspect}" end - rescue => e - break if (IOError === e || Errno::EBADF === e) && closed? - Yahns::Log.exception(logger, 'queue loop', e) - end while true - end + end + rescue => e + # sleep since this check is racy (and uncommon) + break if closed? || (sleep(0.01) && closed?) + Yahns::Log.exception(logger, 'queue loop', e) + end while true end end end