lib/tamashii/agent/component.rb in tamashii-agent-0.1.11 vs lib/tamashii/agent/component.rb in tamashii-agent-0.2.0

- old
+ new

@@ -1,71 +1,73 @@ -require 'nio' require 'tamashii/agent/common' +require 'tamashii/agent/event' + module Tamashii module Agent class Component include Common::Loggable def initialize - @pipe_r, @pipe_w = IO.pipe + @event_queue = Queue.new end - def send_event(type, body) - str = [type, body.bytesize].pack("Cn") + body - @pipe_w.write(str) + def send_event(event) + @event_queue.push(event) end - def receive_event - ev_type, ev_size = @pipe_r.read(3).unpack("Cn") - ev_body = @pipe_r.read(ev_size) - process_event(ev_type, ev_body) + def check_new_event(non_block = false) + @event_queue.pop(non_block) + rescue ThreadError => e + nil end + + def handle_new_event(non_block = false) + if ev = check_new_event(non_block) + process_event(ev) + end + ev + end - def process_event(ev_type, ev_body) - logger.debug "Got event: #{ev_type}, #{ev_body}" + def process_event(event) + logger.debug "Got event: #{event.type}, #{event.body}" end # worker def run - @thr = Thread.start { run_worker_loop } + @worker_thr = Thread.start { run_worker_loop } end def run! run_worker_loop end def stop logger.info "Stopping component" - @thr.exit if @thr - @thr = nil + stop_threads clean_up end + def stop_threads + @worker_thr.exit if @worker_thr + @worker_thr = nil + end + def clean_up end def run_worker_loop - create_selector - register_event_io worker_loop end # a default implementation def worker_loop loop do - ready = @selector.select - ready.each { |m| m.value.call } if ready + if !handle_new_event + logger.error "Thread error. Worker loop terminated" + break + end end - end - - def register_event_io - _monitor = @selector.register(@pipe_r, :r) - _monitor.value = method(:receive_event) - end - - def create_selector - @selector = NIO::Selector.new end end end end