lib/fluent/plugin/in_monitor_agent.rb in fluentd-0.12.16 vs lib/fluent/plugin/in_monitor_agent.rb in fluentd-0.12.17

- old
+ new

@@ -25,10 +25,12 @@ super end config_param :bind, :string, :default => '0.0.0.0' config_param :port, :integer, :default => 24220 + config_param :tag, :string, :default => nil + config_param :emit_interval, :time, :default => 60 class MonitorServlet < WEBrick::HTTPServlet::AbstractServlet def initialize(server, agent) @agent = agent end @@ -197,10 +199,40 @@ result = build_object(req, res) render_json(result) end end + class TimerWatcher < Coolio::TimerWatcher + def initialize(interval, log, &callback) + @callback = callback + @log = log + + # Avoid long shutdown time + @num_call = 0 + if interval >= 10 + min_interval = 10 + @call_interval = interval / 10 + else + min_interval = interval + @call_interval = 0 + end + + super(min_interval, true) + end + + def on_timer + @num_call += 1 + if @num_call >= @call_interval + @num_call = 0 + @callback.call + end + rescue => e + @log.error e.to_s + @log.error_backtrace + end + end + def start log.debug "listening monitoring http server on http://#{@bind}:#{@port}/api/plugins" @srv = WEBrick::HTTPServer.new({ :BindAddress => @bind, :Port => @port, @@ -212,21 +244,51 @@ @srv.mount('/api/config', LTSVConfigMonitorServlet, self) @srv.mount('/api/config.json', JSONConfigMonitorServlet, self) @thread = Thread.new { @srv.start } + if @tag + log.debug "tag parameter is specified. Emit plugins info to '#{@tag}'" + + @loop = Coolio::Loop.new + opts = {:with_config => false} + timer = TimerWatcher.new(@emit_interval, log) { + es = MultiEventStream.new + now = Engine.now + plugins_info_all(opts).each { |record| + es.add(now, record) + } + router.emit_stream(@tag, es) + } + @loop.attach(timer) + @thread_for_emit = Thread.new(&method(:run)) + end end + def run + @loop.run + rescue => e + log.error "unexpected error", :error => e.to_s + log.error_backtrace + end + def shutdown if @srv @srv.shutdown @srv = nil end if @thread @thread.join @thread = nil end + if @tag + @loop.watchers.each { |w| w.detach } + @loop.stop + @loop = nil + @thread_for_emit.join + @thread_for_emit = nil + end end MONITOR_INFO = { 'output_plugin' => 'is_a?(::Fluent::Output)', # deprecated. Use plugin_category instead 'buffer_queue_length' => '@buffer.queue_size', @@ -318,10 +380,10 @@ # Common plugin information obj['plugin_id'] = pe.plugin_id obj['plugin_category'] = plugin_category(pe) obj['type'] = pe.config['@type'] || pe.config['type'] - obj['config'] = pe.config + obj['config'] = pe.config if !opts.has_key?(:with_config) || opts[:with_config] # run MONITOR_INFO in plugins' instance context and store the info to obj MONITOR_INFO.each_pair {|key,code| begin obj[key] = pe.instance_eval(code)