# -*- encoding: binary -*- # Copyright (C) 2013, Eric Wong and all contributors # License: GPLv3 or later (https://www.gnu.org/licenses/gpl-3.0.txt) class Yahns::Queue < SleepyPenguin::Epoll::IO # :nodoc: include SleepyPenguin attr_accessor :fdmap # Yahns::Fdmap # public QEV_RD = Epoll::IN | Epoll::ONESHOT QEV_WR = Epoll::OUT | Epoll::ONESHOT QEV_RDWR = QEV_RD | QEV_WR def self.new super(SleepyPenguin::Epoll::CLOEXEC) end # for HTTP and HTTPS servers, we rely on the io writing to us, first # flags: QEV_RD/QEV_WR (usually QEV_RD) def queue_add(io, flags) @fdmap.add(io) epoll_ctl(Epoll::CTL_ADD, io, flags) end # returns an array of infinitely running threads def spawn_worker_threads(logger, worker_threads, max_events) worker_threads.times do Thread.new do Thread.current[:yahns_rbuf] = "" begin epoll_wait(max_events) do |_, io| # don't care for flags for now case rv = io.yahns_step when :wait_readable epoll_ctl(Epoll::CTL_MOD, io, QEV_RD) when :wait_writable epoll_ctl(Epoll::CTL_MOD, io, QEV_WR) when :wait_readwrite epoll_ctl(Epoll::CTL_MOD, io, QEV_RDWR) when :delete # only used by rack.hijack epoll_ctl(Epoll::CTL_DEL, io, 0) @fdmap.delete(io) when nil # this is be the ONLY place where we call IO#close on # things inside the queue io.close @fdmap.decr else raise "BUG: #{io.inspect}#yahns_step returned: #{rv.inspect}" end end rescue => e break if (IOError === e || Errno::EBADF === e) && closed? Yahns::Log.exception(logger, 'queue loop', e) end while true end end end end