Sha256: cd8317541e9e1a5e10cf05d7ae0c3c190ee5b999dc615dfbbd9873a10114612f

Contents?: true

Size: 1.45 KB

Versions: 2

Compression:

Stored size: 1.45 KB

Contents

module Plux
  class Reactor
    def initialize(count, worker)
      @worker = worker
      @msg_q = Queue.new
      @count = count

      @nio = NIO::Selector.new
      @newly_accepted = Queue.new
      @closed = []

      receive
      process
    end

    def register(socket)
      @newly_accepted << socket
      @nio.wakeup
    end

    private

    def receive
      Thread.new do
        loop do
          @closed.size.times{ @nio.deregister(@closed.pop) }

          @newly_accepted.size.times do
            socket = @newly_accepted.pop
            mon = @nio.register(socket, :r)
            mon.value = Worker.new(socket, @msg_q)
          end

          @nio.select do |m|
            next if m.value.process
            @closed << m.io
          end
        end
      end
    end

    def process
      @count.times.each do
        Thread.new do
          loop{ @worker.process(@msg_q.deq) }
        end
      end
    end

    class Worker
      def initialize(socket, q)
        @parser = Parser.new
        @socket = socket
        @q = q
      end

      def process
        stream = @socket.read_nonblock(Parser::STREAM_MAX_LEN, exception: false)
        return true if stream == :wait_readable

        msgs = @parser.decode(stream)
        last_msg = msgs.pop

        msgs.each{ |msg| @q << msg }
        if last_msg == Parser::LAST_MSG
          @socket.close
          return false
        end
        @q << last_msg

        true
      end
    end

  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
plux-0.1.8 lib/plux/reactor.rb
plux-0.1.7 lib/plux/reactor.rb