Sha256: b25e90ec13484bc4fdfd5e816a6a2436ac4b57ad686c49041526b607bc6dd3c2

Contents?: true

Size: 1.34 KB

Versions: 8

Compression:

Stored size: 1.34 KB

Contents

module Invoker
  class Reactor
    attr_accessor :monitored_fds
    def initialize
      @monitored_fds = []
    end

    def add_to_monitor(fd)
      @monitored_fds << fd
    end

    def remove_from_monitoring(fd)
      @monitored_fds.delete(fd)
    end

    def watch_on_pipe
      ready_read_fds,ready_write_fds,read_error_fds = select(monitored_fds,[],[],0.05)

      if ready_read_fds && !ready_read_fds.empty?
        handle_read_event(ready_read_fds)
      end
    end

    def handle_read_event(ready_read_fds)
      ready_fds = ready_read_fds.flatten.compact
      ready_fds.each {|ready_fd| process_read(ready_fd) }
    end

    def process_read(ready_fd)
      command_worker = Invoker::COMMANDER.get_worker_from_fd(ready_fd)
      begin
        data = read_data(ready_fd)
        command_worker.receive_data(data)
      rescue Invoker::Errors::ProcessTerminated
        remove_from_monitoring(command_worker.pipe_end)
        command_worker.unbind()
      end
    end

    def read_data(ready_fd)
      sock_data = []
      begin
        while(t_data = ready_fd.read_nonblock(64))
          sock_data << t_data
        end
      rescue Errno::EAGAIN
        return sock_data.join
      rescue Errno::EWOULDBLOCK
        return sock_data.join
      rescue
        raise Invoker::Errors::ProcessTerminated.new(ready_fd,sock_data.join)
      end
    end

  end
end

Version data entries

8 entries across 8 versions & 1 rubygems

Version Path
invoker-1.0.4 lib/invoker/reactor.rb
invoker-1.0.3 lib/invoker/reactor.rb
invoker-1.0.2 lib/invoker/reactor.rb
invoker-1.0.1 lib/invoker/reactor.rb
invoker-1.0.0 lib/invoker/reactor.rb
invoker-0.1.2 lib/invoker/reactor.rb
invoker-0.1.1 lib/invoker/reactor.rb
invoker-0.1.1.pre lib/invoker/reactor.rb