lib/fluent/plugin/in_monitor_agent.rb in fluentd-0.10.35 vs lib/fluent/plugin/in_monitor_agent.rb in fluentd-0.10.36
- old
+ new
@@ -14,275 +14,273 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
module Fluent
+ class MonitorAgentInput < Input
+ Plugin.register_input('monitor_agent', self)
-class MonitorAgentInput < Input
- Plugin.register_input('monitor_agent', self)
+ require 'webrick'
- require 'webrick'
-
- def initialize
- require 'cgi'
- super
- end
-
- config_param :bind, :string, :default => '0.0.0.0'
- config_param :port, :integer, :default => 24220
-
- class MonitorServlet < WEBrick::HTTPServlet::AbstractServlet
- def initialize(server, agent)
- @agent = agent
+ def initialize
+ require 'cgi'
+ super
end
- def do_GET(req, res)
- begin
- code, header, body = process(req, res)
- rescue
- code, header, body = render_json_error(500, {
- 'message '=> 'Internal Server Error',
- 'error' => "#{$!}",
- 'backgrace'=> $!.backtrace,
- })
- end
+ config_param :bind, :string, :default => '0.0.0.0'
+ config_param :port, :integer, :default => 24220
- # set response code, header and body
- res.status = code
- header.each_pair {|k,v|
- res[k] = v
- }
- res.body = body
- end
-
- def build_object(req, res)
- unless req.path_info == ""
- return render_json_error(404, "Not found")
+ class MonitorServlet < WEBrick::HTTPServlet::AbstractServlet
+ def initialize(server, agent)
+ @agent = agent
end
- # parse ?=query string
- if req.query_string
+ def do_GET(req, res)
begin
- qs = CGI.parse(req.query_string)
+ code, header, body = process(req, res)
rescue
- return render_json_error(400, "Invalid query string")
+ code, header, body = render_json_error(500, {
+ 'message '=> 'Internal Server Error',
+ 'error' => "#{$!}",
+ 'backgrace'=> $!.backtrace,
+ })
end
- else
- qs = Hash.new {|h,k| [] }
- end
- # if ?debug=1 is set, set :with_debug_info for get_monitor_info
- # and :pretty_json for render_json_error
- opts = {}
- if s = qs['debug'] and s[0]
- opts[:with_debug_info] = true
- opts[:pretty_json] = true
+ # set response code, header and body
+ res.status = code
+ header.each_pair {|k,v|
+ res[k] = v
+ }
+ res.body = body
end
- if tags = qs['tag'] and tag = tags[0]
- # ?tag= to search an output plugin by match pattern
- if obj = @agent.plugin_info_by_tag(tag, opts)
- list = [obj]
- else
- list = []
+ def build_object(req, res)
+ unless req.path_info == ""
+ return render_json_error(404, "Not found")
end
- elsif plugin_ids = qs['id'] and plugin_id = plugin_ids[0]
- # ?id= to search a plugin by 'id <plugin_id>' config param
- if obj = @agent.plugin_info_by_id(plugin_id, opts)
- list = [obj]
+ # parse ?=query string
+ if req.query_string
+ begin
+ qs = CGI.parse(req.query_string)
+ rescue
+ return render_json_error(400, "Invalid query string")
+ end
else
- list = []
+ qs = Hash.new {|h,k| [] }
end
- elsif types = qs['type'] and type = types[0]
- # ?type= to search plugins by 'type <type>' config param
- list = @agent.plugins_info_by_type(type, opts)
+ # if ?debug=1 is set, set :with_debug_info for get_monitor_info
+ # and :pretty_json for render_json_error
+ opts = {}
+ if s = qs['debug'] and s[0]
+ opts[:with_debug_info] = true
+ opts[:pretty_json] = true
+ end
- else
- # otherwise show all plugins
- list = @agent.plugins_info_all(opts)
- end
+ if tags = qs['tag'] and tag = tags[0]
+ # ?tag= to search an output plugin by match pattern
+ if obj = @agent.plugin_info_by_tag(tag, opts)
+ list = [obj]
+ else
+ list = []
+ end
- return list, opts
- end
+ elsif plugin_ids = qs['id'] and plugin_id = plugin_ids[0]
+ # ?id= to search a plugin by 'id <plugin_id>' config param
+ if obj = @agent.plugin_info_by_id(plugin_id, opts)
+ list = [obj]
+ else
+ list = []
+ end
- def render_json(obj, opts={})
- render_json_error(200, obj, opts)
- end
+ elsif types = qs['type'] and type = types[0]
+ # ?type= to search plugins by 'type <type>' config param
+ list = @agent.plugins_info_by_type(type, opts)
- def render_json_error(code, obj, opts={})
- if opts[:pretty_json]
- js = JSON.pretty_generate(obj)
- else
- js = obj.to_json
+ else
+ # otherwise show all plugins
+ list = @agent.plugins_info_all(opts)
+ end
+
+ return list, opts
end
- [code, {'Content-Type'=>'application/json'}, js]
+
+ def render_json(obj, opts={})
+ render_json_error(200, obj, opts)
+ end
+
+ def render_json_error(code, obj, opts={})
+ if opts[:pretty_json]
+ js = JSON.pretty_generate(obj)
+ else
+ js = obj.to_json
+ end
+ [code, {'Content-Type'=>'application/json'}, js]
+ end
end
- end
- class LTSVMonitorServlet < MonitorServlet
- def process(req, res)
- list, opts = build_object(req, res)
- return unless list
+ class LTSVMonitorServlet < MonitorServlet
+ def process(req, res)
+ list, opts = build_object(req, res)
+ return unless list
- normalized = JSON.parse(list.to_json)
+ normalized = JSON.parse(list.to_json)
- text = ''
+ text = ''
- normalized.map {|hash|
- row = []
- hash.each_pair {|k,v|
- unless v.is_a?(Hash) || v.is_a?(Array)
- row << "#{k}:#{v}"
- end
+ normalized.map {|hash|
+ row = []
+ hash.each_pair {|k,v|
+ unless v.is_a?(Hash) || v.is_a?(Array)
+ row << "#{k}:#{v}"
+ end
+ }
+ text << row.join("\t") << "\n"
}
- text << row.join("\t") << "\n"
- }
- [200, {'Content-Type'=>'text/plain'}, text]
+ [200, {'Content-Type'=>'text/plain'}, text]
+ end
end
- end
- class JSONMonitorServlet < MonitorServlet
- def process(req, res)
- list, opts = build_object(req, res)
- return unless list
+ class JSONMonitorServlet < MonitorServlet
+ def process(req, res)
+ list, opts = build_object(req, res)
+ return unless list
- render_json({
- 'plugins' => list
- }, opts)
+ render_json({
+ 'plugins' => list
+ }, opts)
+ end
end
- end
- def start
- $log.debug "listening monitoring http server on http://#{@bind}:#{@port}/api/plugins"
- @srv = WEBrick::HTTPServer.new({
- :BindAddress => @bind,
- :Port => @port,
- :Logger => WEBrick::Log.new(STDERR, WEBrick::Log::FATAL),
- :AccessLog => [],
- })
- @srv.mount('/api/plugins', LTSVMonitorServlet, self)
- @srv.mount('/api/plugins.json', JSONMonitorServlet, self)
- @thread = Thread.new {
- @srv.start
- }
- end
+ def start
+ $log.debug "listening monitoring http server on http://#{@bind}:#{@port}/api/plugins"
+ @srv = WEBrick::HTTPServer.new({
+ :BindAddress => @bind,
+ :Port => @port,
+ :Logger => WEBrick::Log.new(STDERR, WEBrick::Log::FATAL),
+ :AccessLog => [],
+ })
+ @srv.mount('/api/plugins', LTSVMonitorServlet, self)
+ @srv.mount('/api/plugins.json', JSONMonitorServlet, self)
+ @thread = Thread.new {
+ @srv.start
+ }
+ end
- def shutdown
- if @srv
- @srv.shutdown
- @srv = nil
+ def shutdown
+ if @srv
+ @srv.shutdown
+ @srv = nil
+ end
+ if @thread
+ @thread.join
+ @thread = nil
+ end
end
- if @thread
- @thread.join
- @thread = nil
- end
- end
- MONITOR_INFO = {
- 'plugin_id' => 'plugin_id',
- 'type' => 'config["type"]',
- 'output_plugin' => 'is_a?(::Fluent::Output)',
- 'buffer_queue_length' => '@buffer.queue_size',
- 'buffer_total_queued_size' => '@buffer.total_queued_chunk_size',
- 'retry_count' => '@error_history.size',
- 'config' => 'config',
- }
+ MONITOR_INFO = {
+ 'plugin_id' => 'plugin_id',
+ 'type' => 'config["type"]',
+ 'output_plugin' => 'is_a?(::Fluent::Output)',
+ 'buffer_queue_length' => '@buffer.queue_size',
+ 'buffer_total_queued_size' => '@buffer.total_queued_chunk_size',
+ 'retry_count' => '@error_history.size',
+ 'config' => 'config',
+ }
- def all_plugins
- array = []
+ def all_plugins
+ array = []
- # get all input plugins
- array.concat Engine.sources
+ # get all input plugins
+ array.concat Engine.sources
- # get all output plugins
- Engine.matches.each {|m|
- MonitorAgentInput.collect_children(m.output, array)
- }
+ # get all output plugins
+ Engine.matches.each {|m|
+ MonitorAgentInput.collect_children(m.output, array)
+ }
- array
- end
+ array
+ end
- # get nexted plugins (such as <store> of the copy plugin)
- # from the plugin `pe` recursively
- def self.collect_children(pe, array=[])
- array << pe
- if pe.is_a?(MultiOutput) && pe.respond_to?(:outputs)
- pe.outputs.each {|nop|
- collect_children(nop, array)
- }
+ # get nexted plugins (such as <store> of the copy plugin)
+ # from the plugin `pe` recursively
+ def self.collect_children(pe, array=[])
+ array << pe
+ if pe.is_a?(MultiOutput) && pe.respond_to?(:outputs)
+ pe.outputs.each {|nop|
+ collect_children(nop, array)
+ }
+ end
+ array
end
- array
- end
- # try to match the tag and get the info from the
- # matched output plugin
- def plugin_info_by_tag(tag, opts={})
- m = Engine.match(tag)
- if m
- pe = m.output
- get_monitor_info(pe, opts)
- else
- nil
+ # try to match the tag and get the info from the
+ # matched output plugin
+ def plugin_info_by_tag(tag, opts={})
+ m = Engine.match(tag)
+ if m
+ pe = m.output
+ get_monitor_info(pe, opts)
+ else
+ nil
+ end
end
- end
- # search a plugin by plugin_id
- def plugin_info_by_id(plugin_id, opts={})
- found = all_plugins.find {|pe|
- pe.respond_to?(:plugin_id) && pe.plugin_id.to_s == plugin_id
- }
- if found
- get_monitor_info(found, opts)
- else
- nil
+ # search a plugin by plugin_id
+ def plugin_info_by_id(plugin_id, opts={})
+ found = all_plugins.find {|pe|
+ pe.respond_to?(:plugin_id) && pe.plugin_id.to_s == plugin_id
+ }
+ if found
+ get_monitor_info(found, opts)
+ else
+ nil
+ end
end
- end
- # This method returns an array because
- # multiple plugins could have the same type
- def plugins_info_by_type(type, opts={})
- array = all_plugins.select {|pe|
- pe.config['type'] == type rescue nil
- }
- array.map {|pe|
- get_monitor_info(pe, opts)
- }
- end
+ # This method returns an array because
+ # multiple plugins could have the same type
+ def plugins_info_by_type(type, opts={})
+ array = all_plugins.select {|pe|
+ pe.config['type'] == type rescue nil
+ }
+ array.map {|pe|
+ get_monitor_info(pe, opts)
+ }
+ end
- def plugins_info_all(opts={})
- all_plugins.map {|pe|
- get_monitor_info(pe, opts)
- }
- end
+ def plugins_info_all(opts={})
+ all_plugins.map {|pe|
+ get_monitor_info(pe, opts)
+ }
+ end
- # get monitor info from the plugin `pe` and return a hash object
- def get_monitor_info(pe, opts={})
- obj = {}
+ # get monitor info from the plugin `pe` and return a hash object
+ def get_monitor_info(pe, opts={})
+ obj = {}
- # run MONITOR_INFO in plugins' instance context and store the info to obj
- MONITOR_INFO.each_pair {|key,code|
- begin
- obj[key] = pe.instance_eval(code)
- rescue
- end
- }
+ # run MONITOR_INFO in plugins' instance context and store the info to obj
+ MONITOR_INFO.each_pair {|key,code|
+ begin
+ obj[key] = pe.instance_eval(code)
+ rescue
+ end
+ }
- # include all instance variables if :with_debug_info is set
- if opts[:with_debug_info]
- iv = {}
- pe.instance_eval do
- instance_variables.each {|sym|
- key = sym.to_s[1..-1] # removes first '@'
- iv[key] = instance_variable_get(sym)
- }
+ # include all instance variables if :with_debug_info is set
+ if opts[:with_debug_info]
+ iv = {}
+ pe.instance_eval do
+ instance_variables.each {|sym|
+ key = sym.to_s[1..-1] # removes first '@'
+ iv[key] = instance_variable_get(sym)
+ }
+ end
+ obj['instance_variables'] = iv
end
- obj['instance_variables'] = iv
- end
- obj
+ obj
+ end
end
-end
-
end