lib/fluent/plugin/in_monitor_agent.rb in fluentd-0.10.35 vs lib/fluent/plugin/in_monitor_agent.rb in fluentd-0.10.36

- old
+ new

@@ -14,275 +14,273 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # module Fluent + class MonitorAgentInput < Input + Plugin.register_input('monitor_agent', self) -class MonitorAgentInput < Input - Plugin.register_input('monitor_agent', self) + require 'webrick' - require 'webrick' - - def initialize - require 'cgi' - super - end - - config_param :bind, :string, :default => '0.0.0.0' - config_param :port, :integer, :default => 24220 - - class MonitorServlet < WEBrick::HTTPServlet::AbstractServlet - def initialize(server, agent) - @agent = agent + def initialize + require 'cgi' + super end - def do_GET(req, res) - begin - code, header, body = process(req, res) - rescue - code, header, body = render_json_error(500, { - 'message '=> 'Internal Server Error', - 'error' => "#{$!}", - 'backgrace'=> $!.backtrace, - }) - end + config_param :bind, :string, :default => '0.0.0.0' + config_param :port, :integer, :default => 24220 - # set response code, header and body - res.status = code - header.each_pair {|k,v| - res[k] = v - } - res.body = body - end - - def build_object(req, res) - unless req.path_info == "" - return render_json_error(404, "Not found") + class MonitorServlet < WEBrick::HTTPServlet::AbstractServlet + def initialize(server, agent) + @agent = agent end - # parse ?=query string - if req.query_string + def do_GET(req, res) begin - qs = CGI.parse(req.query_string) + code, header, body = process(req, res) rescue - return render_json_error(400, "Invalid query string") + code, header, body = render_json_error(500, { + 'message '=> 'Internal Server Error', + 'error' => "#{$!}", + 'backgrace'=> $!.backtrace, + }) end - else - qs = Hash.new {|h,k| [] } - end - # if ?debug=1 is set, set :with_debug_info for get_monitor_info - # and :pretty_json for render_json_error - opts = {} - if s = qs['debug'] and s[0] - opts[:with_debug_info] = true - opts[:pretty_json] = true + # set response code, header and body + res.status = code + header.each_pair {|k,v| + res[k] = v + } + res.body = body end - if tags = qs['tag'] and tag = tags[0] - # ?tag= to search an output plugin by match pattern - if obj = @agent.plugin_info_by_tag(tag, opts) - list = [obj] - else - list = [] + def build_object(req, res) + unless req.path_info == "" + return render_json_error(404, "Not found") end - elsif plugin_ids = qs['id'] and plugin_id = plugin_ids[0] - # ?id= to search a plugin by 'id <plugin_id>' config param - if obj = @agent.plugin_info_by_id(plugin_id, opts) - list = [obj] + # parse ?=query string + if req.query_string + begin + qs = CGI.parse(req.query_string) + rescue + return render_json_error(400, "Invalid query string") + end else - list = [] + qs = Hash.new {|h,k| [] } end - elsif types = qs['type'] and type = types[0] - # ?type= to search plugins by 'type <type>' config param - list = @agent.plugins_info_by_type(type, opts) + # if ?debug=1 is set, set :with_debug_info for get_monitor_info + # and :pretty_json for render_json_error + opts = {} + if s = qs['debug'] and s[0] + opts[:with_debug_info] = true + opts[:pretty_json] = true + end - else - # otherwise show all plugins - list = @agent.plugins_info_all(opts) - end + if tags = qs['tag'] and tag = tags[0] + # ?tag= to search an output plugin by match pattern + if obj = @agent.plugin_info_by_tag(tag, opts) + list = [obj] + else + list = [] + end - return list, opts - end + elsif plugin_ids = qs['id'] and plugin_id = plugin_ids[0] + # ?id= to search a plugin by 'id <plugin_id>' config param + if obj = @agent.plugin_info_by_id(plugin_id, opts) + list = [obj] + else + list = [] + end - def render_json(obj, opts={}) - render_json_error(200, obj, opts) - end + elsif types = qs['type'] and type = types[0] + # ?type= to search plugins by 'type <type>' config param + list = @agent.plugins_info_by_type(type, opts) - def render_json_error(code, obj, opts={}) - if opts[:pretty_json] - js = JSON.pretty_generate(obj) - else - js = obj.to_json + else + # otherwise show all plugins + list = @agent.plugins_info_all(opts) + end + + return list, opts end - [code, {'Content-Type'=>'application/json'}, js] + + def render_json(obj, opts={}) + render_json_error(200, obj, opts) + end + + def render_json_error(code, obj, opts={}) + if opts[:pretty_json] + js = JSON.pretty_generate(obj) + else + js = obj.to_json + end + [code, {'Content-Type'=>'application/json'}, js] + end end - end - class LTSVMonitorServlet < MonitorServlet - def process(req, res) - list, opts = build_object(req, res) - return unless list + class LTSVMonitorServlet < MonitorServlet + def process(req, res) + list, opts = build_object(req, res) + return unless list - normalized = JSON.parse(list.to_json) + normalized = JSON.parse(list.to_json) - text = '' + text = '' - normalized.map {|hash| - row = [] - hash.each_pair {|k,v| - unless v.is_a?(Hash) || v.is_a?(Array) - row << "#{k}:#{v}" - end + normalized.map {|hash| + row = [] + hash.each_pair {|k,v| + unless v.is_a?(Hash) || v.is_a?(Array) + row << "#{k}:#{v}" + end + } + text << row.join("\t") << "\n" } - text << row.join("\t") << "\n" - } - [200, {'Content-Type'=>'text/plain'}, text] + [200, {'Content-Type'=>'text/plain'}, text] + end end - end - class JSONMonitorServlet < MonitorServlet - def process(req, res) - list, opts = build_object(req, res) - return unless list + class JSONMonitorServlet < MonitorServlet + def process(req, res) + list, opts = build_object(req, res) + return unless list - render_json({ - 'plugins' => list - }, opts) + render_json({ + 'plugins' => list + }, opts) + end end - end - def start - $log.debug "listening monitoring http server on http://#{@bind}:#{@port}/api/plugins" - @srv = WEBrick::HTTPServer.new({ - :BindAddress => @bind, - :Port => @port, - :Logger => WEBrick::Log.new(STDERR, WEBrick::Log::FATAL), - :AccessLog => [], - }) - @srv.mount('/api/plugins', LTSVMonitorServlet, self) - @srv.mount('/api/plugins.json', JSONMonitorServlet, self) - @thread = Thread.new { - @srv.start - } - end + def start + $log.debug "listening monitoring http server on http://#{@bind}:#{@port}/api/plugins" + @srv = WEBrick::HTTPServer.new({ + :BindAddress => @bind, + :Port => @port, + :Logger => WEBrick::Log.new(STDERR, WEBrick::Log::FATAL), + :AccessLog => [], + }) + @srv.mount('/api/plugins', LTSVMonitorServlet, self) + @srv.mount('/api/plugins.json', JSONMonitorServlet, self) + @thread = Thread.new { + @srv.start + } + end - def shutdown - if @srv - @srv.shutdown - @srv = nil + def shutdown + if @srv + @srv.shutdown + @srv = nil + end + if @thread + @thread.join + @thread = nil + end end - if @thread - @thread.join - @thread = nil - end - end - MONITOR_INFO = { - 'plugin_id' => 'plugin_id', - 'type' => 'config["type"]', - 'output_plugin' => 'is_a?(::Fluent::Output)', - 'buffer_queue_length' => '@buffer.queue_size', - 'buffer_total_queued_size' => '@buffer.total_queued_chunk_size', - 'retry_count' => '@error_history.size', - 'config' => 'config', - } + MONITOR_INFO = { + 'plugin_id' => 'plugin_id', + 'type' => 'config["type"]', + 'output_plugin' => 'is_a?(::Fluent::Output)', + 'buffer_queue_length' => '@buffer.queue_size', + 'buffer_total_queued_size' => '@buffer.total_queued_chunk_size', + 'retry_count' => '@error_history.size', + 'config' => 'config', + } - def all_plugins - array = [] + def all_plugins + array = [] - # get all input plugins - array.concat Engine.sources + # get all input plugins + array.concat Engine.sources - # get all output plugins - Engine.matches.each {|m| - MonitorAgentInput.collect_children(m.output, array) - } + # get all output plugins + Engine.matches.each {|m| + MonitorAgentInput.collect_children(m.output, array) + } - array - end + array + end - # get nexted plugins (such as <store> of the copy plugin) - # from the plugin `pe` recursively - def self.collect_children(pe, array=[]) - array << pe - if pe.is_a?(MultiOutput) && pe.respond_to?(:outputs) - pe.outputs.each {|nop| - collect_children(nop, array) - } + # get nexted plugins (such as <store> of the copy plugin) + # from the plugin `pe` recursively + def self.collect_children(pe, array=[]) + array << pe + if pe.is_a?(MultiOutput) && pe.respond_to?(:outputs) + pe.outputs.each {|nop| + collect_children(nop, array) + } + end + array end - array - end - # try to match the tag and get the info from the - # matched output plugin - def plugin_info_by_tag(tag, opts={}) - m = Engine.match(tag) - if m - pe = m.output - get_monitor_info(pe, opts) - else - nil + # try to match the tag and get the info from the + # matched output plugin + def plugin_info_by_tag(tag, opts={}) + m = Engine.match(tag) + if m + pe = m.output + get_monitor_info(pe, opts) + else + nil + end end - end - # search a plugin by plugin_id - def plugin_info_by_id(plugin_id, opts={}) - found = all_plugins.find {|pe| - pe.respond_to?(:plugin_id) && pe.plugin_id.to_s == plugin_id - } - if found - get_monitor_info(found, opts) - else - nil + # search a plugin by plugin_id + def plugin_info_by_id(plugin_id, opts={}) + found = all_plugins.find {|pe| + pe.respond_to?(:plugin_id) && pe.plugin_id.to_s == plugin_id + } + if found + get_monitor_info(found, opts) + else + nil + end end - end - # This method returns an array because - # multiple plugins could have the same type - def plugins_info_by_type(type, opts={}) - array = all_plugins.select {|pe| - pe.config['type'] == type rescue nil - } - array.map {|pe| - get_monitor_info(pe, opts) - } - end + # This method returns an array because + # multiple plugins could have the same type + def plugins_info_by_type(type, opts={}) + array = all_plugins.select {|pe| + pe.config['type'] == type rescue nil + } + array.map {|pe| + get_monitor_info(pe, opts) + } + end - def plugins_info_all(opts={}) - all_plugins.map {|pe| - get_monitor_info(pe, opts) - } - end + def plugins_info_all(opts={}) + all_plugins.map {|pe| + get_monitor_info(pe, opts) + } + end - # get monitor info from the plugin `pe` and return a hash object - def get_monitor_info(pe, opts={}) - obj = {} + # get monitor info from the plugin `pe` and return a hash object + def get_monitor_info(pe, opts={}) + obj = {} - # run MONITOR_INFO in plugins' instance context and store the info to obj - MONITOR_INFO.each_pair {|key,code| - begin - obj[key] = pe.instance_eval(code) - rescue - end - } + # run MONITOR_INFO in plugins' instance context and store the info to obj + MONITOR_INFO.each_pair {|key,code| + begin + obj[key] = pe.instance_eval(code) + rescue + end + } - # include all instance variables if :with_debug_info is set - if opts[:with_debug_info] - iv = {} - pe.instance_eval do - instance_variables.each {|sym| - key = sym.to_s[1..-1] # removes first '@' - iv[key] = instance_variable_get(sym) - } + # include all instance variables if :with_debug_info is set + if opts[:with_debug_info] + iv = {} + pe.instance_eval do + instance_variables.each {|sym| + key = sym.to_s[1..-1] # removes first '@' + iv[key] = instance_variable_get(sym) + } + end + obj['instance_variables'] = iv end - obj['instance_variables'] = iv - end - obj + obj + end end -end - end