lib/fluent/plugin/in_monitor_agent.rb in fluentd-0.12.16 vs lib/fluent/plugin/in_monitor_agent.rb in fluentd-0.12.17
- old
+ new
@@ -25,10 +25,12 @@
super
end
config_param :bind, :string, :default => '0.0.0.0'
config_param :port, :integer, :default => 24220
+ config_param :tag, :string, :default => nil
+ config_param :emit_interval, :time, :default => 60
class MonitorServlet < WEBrick::HTTPServlet::AbstractServlet
def initialize(server, agent)
@agent = agent
end
@@ -197,10 +199,40 @@
result = build_object(req, res)
render_json(result)
end
end
+ class TimerWatcher < Coolio::TimerWatcher
+ def initialize(interval, log, &callback)
+ @callback = callback
+ @log = log
+
+ # Avoid long shutdown time
+ @num_call = 0
+ if interval >= 10
+ min_interval = 10
+ @call_interval = interval / 10
+ else
+ min_interval = interval
+ @call_interval = 0
+ end
+
+ super(min_interval, true)
+ end
+
+ def on_timer
+ @num_call += 1
+ if @num_call >= @call_interval
+ @num_call = 0
+ @callback.call
+ end
+ rescue => e
+ @log.error e.to_s
+ @log.error_backtrace
+ end
+ end
+
def start
log.debug "listening monitoring http server on http://#{@bind}:#{@port}/api/plugins"
@srv = WEBrick::HTTPServer.new({
:BindAddress => @bind,
:Port => @port,
@@ -212,21 +244,51 @@
@srv.mount('/api/config', LTSVConfigMonitorServlet, self)
@srv.mount('/api/config.json', JSONConfigMonitorServlet, self)
@thread = Thread.new {
@srv.start
}
+ if @tag
+ log.debug "tag parameter is specified. Emit plugins info to '#{@tag}'"
+
+ @loop = Coolio::Loop.new
+ opts = {:with_config => false}
+ timer = TimerWatcher.new(@emit_interval, log) {
+ es = MultiEventStream.new
+ now = Engine.now
+ plugins_info_all(opts).each { |record|
+ es.add(now, record)
+ }
+ router.emit_stream(@tag, es)
+ }
+ @loop.attach(timer)
+ @thread_for_emit = Thread.new(&method(:run))
+ end
end
+ def run
+ @loop.run
+ rescue => e
+ log.error "unexpected error", :error => e.to_s
+ log.error_backtrace
+ end
+
def shutdown
if @srv
@srv.shutdown
@srv = nil
end
if @thread
@thread.join
@thread = nil
end
+ if @tag
+ @loop.watchers.each { |w| w.detach }
+ @loop.stop
+ @loop = nil
+ @thread_for_emit.join
+ @thread_for_emit = nil
+ end
end
MONITOR_INFO = {
'output_plugin' => 'is_a?(::Fluent::Output)', # deprecated. Use plugin_category instead
'buffer_queue_length' => '@buffer.queue_size',
@@ -318,10 +380,10 @@
# Common plugin information
obj['plugin_id'] = pe.plugin_id
obj['plugin_category'] = plugin_category(pe)
obj['type'] = pe.config['@type'] || pe.config['type']
- obj['config'] = pe.config
+ obj['config'] = pe.config if !opts.has_key?(:with_config) || opts[:with_config]
# run MONITOR_INFO in plugins' instance context and store the info to obj
MONITOR_INFO.each_pair {|key,code|
begin
obj[key] = pe.instance_eval(code)