lib/invoker/reactor.rb in invoker-1.0.4 vs lib/invoker/reactor.rb in invoker-1.1.0

- old
+ new

@@ -1,56 +1,37 @@ module Invoker class Reactor - attr_accessor :monitored_fds + attr_accessor :reader + def initialize - @monitored_fds = [] + @reader = Invoker::Reactor::Reader.new end - def add_to_monitor(fd) - @monitored_fds << fd + def watch_for_read(fd) + reader.watch_for_read(fd) end - def remove_from_monitoring(fd) - @monitored_fds.delete(fd) + # Writes data to client socket and raises error if errors + # while writing + def send_data(socket, data) + socket.write(data) + rescue + raise Invoker::Errors::ClientDisconnected end - def watch_on_pipe - ready_read_fds,ready_write_fds,read_error_fds = select(monitored_fds,[],[],0.05) + def monitor_for_fd_events + ready_read_fds, _ , _ = select(*options_for_select) if ready_read_fds && !ready_read_fds.empty? - handle_read_event(ready_read_fds) + reader.handle_read_event(ready_read_fds) end end - def handle_read_event(ready_read_fds) - ready_fds = ready_read_fds.flatten.compact - ready_fds.each {|ready_fd| process_read(ready_fd) } - end + private - def process_read(ready_fd) - command_worker = Invoker::COMMANDER.get_worker_from_fd(ready_fd) - begin - data = read_data(ready_fd) - command_worker.receive_data(data) - rescue Invoker::Errors::ProcessTerminated - remove_from_monitoring(command_worker.pipe_end) - command_worker.unbind() - end + def options_for_select + [reader.read_array, [], [], 0.05] end - - def read_data(ready_fd) - sock_data = [] - begin - while(t_data = ready_fd.read_nonblock(64)) - sock_data << t_data - end - rescue Errno::EAGAIN - return sock_data.join - rescue Errno::EWOULDBLOCK - return sock_data.join - rescue - raise Invoker::Errors::ProcessTerminated.new(ready_fd,sock_data.join) - end - end - end end + +require "invoker/reactor/reader"