lib/invoker/reactor.rb in invoker-1.0.4 vs lib/invoker/reactor.rb in invoker-1.1.0
- old
+ new
@@ -1,56 +1,37 @@
module Invoker
class Reactor
- attr_accessor :monitored_fds
+ attr_accessor :reader
+
def initialize
- @monitored_fds = []
+ @reader = Invoker::Reactor::Reader.new
end
- def add_to_monitor(fd)
- @monitored_fds << fd
+ def watch_for_read(fd)
+ reader.watch_for_read(fd)
end
- def remove_from_monitoring(fd)
- @monitored_fds.delete(fd)
+ # Writes data to client socket and raises error if errors
+ # while writing
+ def send_data(socket, data)
+ socket.write(data)
+ rescue
+ raise Invoker::Errors::ClientDisconnected
end
- def watch_on_pipe
- ready_read_fds,ready_write_fds,read_error_fds = select(monitored_fds,[],[],0.05)
+ def monitor_for_fd_events
+ ready_read_fds, _ , _ = select(*options_for_select)
if ready_read_fds && !ready_read_fds.empty?
- handle_read_event(ready_read_fds)
+ reader.handle_read_event(ready_read_fds)
end
end
- def handle_read_event(ready_read_fds)
- ready_fds = ready_read_fds.flatten.compact
- ready_fds.each {|ready_fd| process_read(ready_fd) }
- end
+ private
- def process_read(ready_fd)
- command_worker = Invoker::COMMANDER.get_worker_from_fd(ready_fd)
- begin
- data = read_data(ready_fd)
- command_worker.receive_data(data)
- rescue Invoker::Errors::ProcessTerminated
- remove_from_monitoring(command_worker.pipe_end)
- command_worker.unbind()
- end
+ def options_for_select
+ [reader.read_array, [], [], 0.05]
end
-
- def read_data(ready_fd)
- sock_data = []
- begin
- while(t_data = ready_fd.read_nonblock(64))
- sock_data << t_data
- end
- rescue Errno::EAGAIN
- return sock_data.join
- rescue Errno::EWOULDBLOCK
- return sock_data.join
- rescue
- raise Invoker::Errors::ProcessTerminated.new(ready_fd,sock_data.join)
- end
- end
-
end
end
+
+require "invoker/reactor/reader"