lib/fluent/plugin/in_http.rb in fluentd-0.14.8 vs lib/fluent/plugin/in_http.rb in fluentd-0.14.9

- old
+ new

@@ -12,33 +12,34 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # +require 'fluent/plugin/input' +require 'fluent/plugin/parser' +require 'fluent/event' + +require 'http/parser' +require 'webrick/httputils' require 'uri' require 'socket' require 'json' -require 'cool.io' +module Fluent::Plugin + class InHttpParser < Parser + Fluent::Plugin.register_parser('in_http', self) + def parse(text) + # this plugin is dummy implementation not to raise error + yield nil, nil + end + end -require 'fluent/input' -require 'fluent/event' -require 'fluent/process' - -module Fluent class HttpInput < Input - Plugin.register_input('http', self) + Fluent::Plugin.register_input('http', self) - include DetachMultiProcessMixin + helpers :parser, :compat_parameters, :event_loop - require 'http/parser' - - def initialize - require 'webrick/httputils' - super - end - EMPTY_GIF_IMAGE = "GIF89a\u0001\u0000\u0001\u0000\x80\xFF\u0000\xFF\xFF\xFF\u0000\u0000\u0000,\u0000\u0000\u0000\u0000\u0001\u0000\u0001\u0000\u0000\u0002\u0002D\u0001\u0000;".force_encoding("UTF-8") desc 'The port to listen to.' config_param :port, :integer, default: 9880 desc 'The bind address to listen to.' @@ -50,29 +51,40 @@ config_param :backlog, :integer, default: nil desc 'Add HTTP_ prefix headers to the record.' config_param :add_http_headers, :bool, default: false desc 'Add REMOTE_ADDR header to the record.' config_param :add_remote_addr, :bool, default: false - desc 'The format of the HTTP body.' - config_param :format, :string, default: 'default' config_param :blocking_timeout, :time, default: 0.5 desc 'Set a white list of domains that can do CORS (Cross-Origin Resource Sharing)' config_param :cors_allow_origins, :array, default: nil desc 'Respond with empty gif image of 1x1 pixel.' config_param :respond_with_empty_img, :bool, default: false + config_section :parse do + config_set_default :@type, 'in_http' + end + + EVENT_RECORD_PARAMETER = '_event_record' + def configure(conf) + compat_parameters_convert(conf, :parser) + super - m = if @format == 'default' + m = if @parser_configs.first['@type'] == 'in_http' + @parser_msgpack = parser_create(usage: 'parser_in_http_msgpack', type: 'msgpack') + @parser_msgpack.estimate_current_event = false + @parser_json = parser_create(usage: 'parser_in_http_json', type: 'json') + @parser_json.estimate_current_event = false + @format_name = 'default' method(:parse_params_default) else - @parser = Plugin.new_parser(@format) - @parser.configure(conf) + @parser = parser_create + @format_name = @parser_configs.first['@type'] method(:parse_params_with_parser) end - (class << self; self; end).module_eval do + self.singleton_class.module_eval do define_method(:parse_params, m) end end class KeepaliveManager < Coolio::TimerWatcher @@ -98,51 +110,41 @@ } end end def start - log.debug "listening http on #{@bind}:#{@port}" + @_event_loop_run_timeout = @blocking_timeout + super + + log.debug "listening http", bind: @bind, port: @port + socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH'] if Fluent.windows? socket_manager_path = socket_manager_path.to_i end client = ServerEngine::SocketManager::Client.new(socket_manager_path) lsock = client.listen_tcp(@bind, @port) - detach_multi_process do - super - @km = KeepaliveManager.new(@keepalive_timeout) - @lsock = Coolio::TCPServer.new(lsock, nil, Handler, @km, method(:on_request), - @body_size_limit, @format, log, - @cors_allow_origins) - @lsock.listen(@backlog) unless @backlog.nil? + @km = KeepaliveManager.new(@keepalive_timeout) + @lsock = Coolio::TCPServer.new( + lsock, nil, Handler, @km, method(:on_request), + @body_size_limit, @format_name, log, + @cors_allow_origins + ) + @lsock.listen(@backlog) unless @backlog.nil? + event_loop_attach(@km) + event_loop_attach(@lsock) - @loop = Coolio::Loop.new - @loop.attach(@km) - @loop.attach(@lsock) - - @thread = Thread.new(&method(:run)) - end + @float_time_parser = Fluent::NumericTimeParser.new(:float) end - def shutdown - @loop.watchers.each {|w| w.detach } - @loop.stop + def close @lsock.close - @thread.join - super end - def run - @loop.run(@blocking_timeout) - rescue - log.error "unexpected error", error: $!.to_s - log.error_backtrace - end - def on_request(path_info, params) begin path = path_info[1..-1] # remove / tag = path.split('/').join('.') record_time, record = parse_params(params) @@ -168,23 +170,23 @@ record['REMOTE_ADDR'] = params['REMOTE_ADDR'] end end time = if param_time = params['time'] param_time = param_time.to_f - param_time.zero? ? Engine.now : Fluent::EventTime.from_time(Time.at(param_time)) + param_time.zero? ? Fluent::Engine.now : @float_time_parser.parse(param_time) else - record_time.nil? ? Engine.now : record_time + record_time.nil? ? Fluent::Engine.now : record_time end rescue return ["400 Bad Request", {'Content-Type'=>'text/plain'}, "400 Bad Request\n#{$!}\n"] end # TODO server error begin # Support batched requests if record.is_a?(Array) - mes = MultiEventStream.new + mes = Fluent::MultiEventStream.new record.each do |single_record| if @add_http_headers params.each_pair { |k,v| if k.start_with?("HTTP_") single_record[k] = v @@ -213,43 +215,44 @@ end private def parse_params_default(params) - record = if msgpack = params['msgpack'] - Engine.msgpack_factory.unpacker.feed(msgpack).read - elsif js = params['json'] - JSON.parse(js) - else - raise "'json' or 'msgpack' parameter is required" - end - return nil, record + if msgpack = params['msgpack'] + @parser_msgpack.parse(msgpack) do |_time, record| + return nil, record + end + elsif js = params['json'] + @parser_json.parse(js) do |_time, record| + return nil, record + end + else + raise "'json' or 'msgpack' parameter is required" + end end - EVENT_RECORD_PARAMETER = '_event_record' - def parse_params_with_parser(params) if content = params[EVENT_RECORD_PARAMETER] @parser.parse(content) { |time, record| - raise "Received event is not #{@format}: #{content}" if record.nil? + raise "Received event is not #{@format_name}: #{content}" if record.nil? return time, record } else raise "'#{EVENT_RECORD_PARAMETER}' parameter is required" end end class Handler < Coolio::Socket attr_reader :content_type - def initialize(io, km, callback, body_size_limit, format, log, cors_allow_origins) + def initialize(io, km, callback, body_size_limit, format_name, log, cors_allow_origins) super(io) @km = km @callback = callback @body_size_limit = body_size_limit @next_close = false - @format = format + @format_name = format_name @log = log @cors_allow_origins = cors_allow_origins @idle = 0 @km.add(self) @@ -353,10 +356,10 @@ @env['REMOTE_ADDR'] = @remote_addr if @remote_addr uri = URI.parse(@parser.request_url) params = WEBrick::HTTPUtils.parse_query(uri.query) - if @format != 'default' + if @format_name != 'default' params[EVENT_RECORD_PARAMETER] = @body elsif @content_type =~ /^application\/x-www-form-urlencoded/ params.update WEBrick::HTTPUtils.parse_query(@body) elsif @content_type =~ /^multipart\/form-data; boundary=(.+)/ boundary = WEBrick::HTTPUtils.dequote($1)