lib/tamashii/agent/component.rb in tamashii-agent-0.1.11 vs lib/tamashii/agent/component.rb in tamashii-agent-0.2.0
- old
+ new
@@ -1,71 +1,73 @@
-require 'nio'
require 'tamashii/agent/common'
+require 'tamashii/agent/event'
+
module Tamashii
module Agent
class Component
include Common::Loggable
def initialize
- @pipe_r, @pipe_w = IO.pipe
+ @event_queue = Queue.new
end
- def send_event(type, body)
- str = [type, body.bytesize].pack("Cn") + body
- @pipe_w.write(str)
+ def send_event(event)
+ @event_queue.push(event)
end
- def receive_event
- ev_type, ev_size = @pipe_r.read(3).unpack("Cn")
- ev_body = @pipe_r.read(ev_size)
- process_event(ev_type, ev_body)
+ def check_new_event(non_block = false)
+ @event_queue.pop(non_block)
+ rescue ThreadError => e
+ nil
end
+
+ def handle_new_event(non_block = false)
+ if ev = check_new_event(non_block)
+ process_event(ev)
+ end
+ ev
+ end
- def process_event(ev_type, ev_body)
- logger.debug "Got event: #{ev_type}, #{ev_body}"
+ def process_event(event)
+ logger.debug "Got event: #{event.type}, #{event.body}"
end
# worker
def run
- @thr = Thread.start { run_worker_loop }
+ @worker_thr = Thread.start { run_worker_loop }
end
def run!
run_worker_loop
end
def stop
logger.info "Stopping component"
- @thr.exit if @thr
- @thr = nil
+ stop_threads
clean_up
end
+ def stop_threads
+ @worker_thr.exit if @worker_thr
+ @worker_thr = nil
+ end
+
def clean_up
end
def run_worker_loop
- create_selector
- register_event_io
worker_loop
end
# a default implementation
def worker_loop
loop do
- ready = @selector.select
- ready.each { |m| m.value.call } if ready
+ if !handle_new_event
+ logger.error "Thread error. Worker loop terminated"
+ break
+ end
end
- end
-
- def register_event_io
- _monitor = @selector.register(@pipe_r, :r)
- _monitor.value = method(:receive_event)
- end
-
- def create_selector
- @selector = NIO::Selector.new
end
end
end
end