lib/yahns/queue_epoll.rb in yahns-0.0.0 vs lib/yahns/queue_epoll.rb in yahns-0.0.1
- old
+ new
@@ -20,38 +20,36 @@
@fdmap.add(io)
epoll_ctl(Epoll::CTL_ADD, io, flags)
end
# returns an array of infinitely running threads
- def spawn_worker_threads(logger, worker_threads, max_events)
- worker_threads.times do
- Thread.new do
- Thread.current[:yahns_rbuf] = ""
- begin
- epoll_wait(max_events) do |_, io| # don't care for flags for now
- case rv = io.yahns_step
- when :wait_readable
- epoll_ctl(Epoll::CTL_MOD, io, QEV_RD)
- when :wait_writable
- epoll_ctl(Epoll::CTL_MOD, io, QEV_WR)
- when :wait_readwrite
- epoll_ctl(Epoll::CTL_MOD, io, QEV_RDWR)
- when :delete # only used by rack.hijack
- epoll_ctl(Epoll::CTL_DEL, io, 0)
- @fdmap.delete(io)
- when nil
- # this is be the ONLY place where we call IO#close on
- # things inside the queue
- io.close
- @fdmap.decr
- else
- raise "BUG: #{io.inspect}#yahns_step returned: #{rv.inspect}"
- end
+ def worker_thread(logger, max_events)
+ Thread.new do
+ Thread.current[:yahns_rbuf] = ""
+ begin
+ epoll_wait(max_events) do |_, io| # don't care for flags for now
+ case rv = io.yahns_step
+ when :wait_readable
+ epoll_ctl(Epoll::CTL_MOD, io, QEV_RD)
+ when :wait_writable
+ epoll_ctl(Epoll::CTL_MOD, io, QEV_WR)
+ when :wait_readwrite
+ epoll_ctl(Epoll::CTL_MOD, io, QEV_RDWR)
+ when :ignore # only used by rack.hijack
+ @fdmap.decr
+ when nil
+ # this is be the ONLY place where we call IO#close on
+ # things inside the queue
+ io.close
+ @fdmap.decr
+ else
+ raise "BUG: #{io.inspect}#yahns_step returned: #{rv.inspect}"
end
- rescue => e
- break if (IOError === e || Errno::EBADF === e) && closed?
- Yahns::Log.exception(logger, 'queue loop', e)
- end while true
- end
+ end
+ rescue => e
+ # sleep since this check is racy (and uncommon)
+ break if closed? || (sleep(0.01) && closed?)
+ Yahns::Log.exception(logger, 'queue loop', e)
+ end while true
end
end
end