lib/fluent/plugin/in_http.rb in fluentd-0.14.8 vs lib/fluent/plugin/in_http.rb in fluentd-0.14.9
- old
+ new
@@ -12,33 +12,34 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
+require 'fluent/plugin/input'
+require 'fluent/plugin/parser'
+require 'fluent/event'
+
+require 'http/parser'
+require 'webrick/httputils'
require 'uri'
require 'socket'
require 'json'
-require 'cool.io'
+module Fluent::Plugin
+ class InHttpParser < Parser
+ Fluent::Plugin.register_parser('in_http', self)
+ def parse(text)
+ # this plugin is dummy implementation not to raise error
+ yield nil, nil
+ end
+ end
-require 'fluent/input'
-require 'fluent/event'
-require 'fluent/process'
-
-module Fluent
class HttpInput < Input
- Plugin.register_input('http', self)
+ Fluent::Plugin.register_input('http', self)
- include DetachMultiProcessMixin
+ helpers :parser, :compat_parameters, :event_loop
- require 'http/parser'
-
- def initialize
- require 'webrick/httputils'
- super
- end
-
EMPTY_GIF_IMAGE = "GIF89a\u0001\u0000\u0001\u0000\x80\xFF\u0000\xFF\xFF\xFF\u0000\u0000\u0000,\u0000\u0000\u0000\u0000\u0001\u0000\u0001\u0000\u0000\u0002\u0002D\u0001\u0000;".force_encoding("UTF-8")
desc 'The port to listen to.'
config_param :port, :integer, default: 9880
desc 'The bind address to listen to.'
@@ -50,29 +51,40 @@
config_param :backlog, :integer, default: nil
desc 'Add HTTP_ prefix headers to the record.'
config_param :add_http_headers, :bool, default: false
desc 'Add REMOTE_ADDR header to the record.'
config_param :add_remote_addr, :bool, default: false
- desc 'The format of the HTTP body.'
- config_param :format, :string, default: 'default'
config_param :blocking_timeout, :time, default: 0.5
desc 'Set a white list of domains that can do CORS (Cross-Origin Resource Sharing)'
config_param :cors_allow_origins, :array, default: nil
desc 'Respond with empty gif image of 1x1 pixel.'
config_param :respond_with_empty_img, :bool, default: false
+ config_section :parse do
+ config_set_default :@type, 'in_http'
+ end
+
+ EVENT_RECORD_PARAMETER = '_event_record'
+
def configure(conf)
+ compat_parameters_convert(conf, :parser)
+
super
- m = if @format == 'default'
+ m = if @parser_configs.first['@type'] == 'in_http'
+ @parser_msgpack = parser_create(usage: 'parser_in_http_msgpack', type: 'msgpack')
+ @parser_msgpack.estimate_current_event = false
+ @parser_json = parser_create(usage: 'parser_in_http_json', type: 'json')
+ @parser_json.estimate_current_event = false
+ @format_name = 'default'
method(:parse_params_default)
else
- @parser = Plugin.new_parser(@format)
- @parser.configure(conf)
+ @parser = parser_create
+ @format_name = @parser_configs.first['@type']
method(:parse_params_with_parser)
end
- (class << self; self; end).module_eval do
+ self.singleton_class.module_eval do
define_method(:parse_params, m)
end
end
class KeepaliveManager < Coolio::TimerWatcher
@@ -98,51 +110,41 @@
}
end
end
def start
- log.debug "listening http on #{@bind}:#{@port}"
+ @_event_loop_run_timeout = @blocking_timeout
+ super
+
+ log.debug "listening http", bind: @bind, port: @port
+
socket_manager_path = ENV['SERVERENGINE_SOCKETMANAGER_PATH']
if Fluent.windows?
socket_manager_path = socket_manager_path.to_i
end
client = ServerEngine::SocketManager::Client.new(socket_manager_path)
lsock = client.listen_tcp(@bind, @port)
- detach_multi_process do
- super
- @km = KeepaliveManager.new(@keepalive_timeout)
- @lsock = Coolio::TCPServer.new(lsock, nil, Handler, @km, method(:on_request),
- @body_size_limit, @format, log,
- @cors_allow_origins)
- @lsock.listen(@backlog) unless @backlog.nil?
+ @km = KeepaliveManager.new(@keepalive_timeout)
+ @lsock = Coolio::TCPServer.new(
+ lsock, nil, Handler, @km, method(:on_request),
+ @body_size_limit, @format_name, log,
+ @cors_allow_origins
+ )
+ @lsock.listen(@backlog) unless @backlog.nil?
+ event_loop_attach(@km)
+ event_loop_attach(@lsock)
- @loop = Coolio::Loop.new
- @loop.attach(@km)
- @loop.attach(@lsock)
-
- @thread = Thread.new(&method(:run))
- end
+ @float_time_parser = Fluent::NumericTimeParser.new(:float)
end
- def shutdown
- @loop.watchers.each {|w| w.detach }
- @loop.stop
+ def close
@lsock.close
- @thread.join
-
super
end
- def run
- @loop.run(@blocking_timeout)
- rescue
- log.error "unexpected error", error: $!.to_s
- log.error_backtrace
- end
-
def on_request(path_info, params)
begin
path = path_info[1..-1] # remove /
tag = path.split('/').join('.')
record_time, record = parse_params(params)
@@ -168,23 +170,23 @@
record['REMOTE_ADDR'] = params['REMOTE_ADDR']
end
end
time = if param_time = params['time']
param_time = param_time.to_f
- param_time.zero? ? Engine.now : Fluent::EventTime.from_time(Time.at(param_time))
+ param_time.zero? ? Fluent::Engine.now : @float_time_parser.parse(param_time)
else
- record_time.nil? ? Engine.now : record_time
+ record_time.nil? ? Fluent::Engine.now : record_time
end
rescue
return ["400 Bad Request", {'Content-Type'=>'text/plain'}, "400 Bad Request\n#{$!}\n"]
end
# TODO server error
begin
# Support batched requests
if record.is_a?(Array)
- mes = MultiEventStream.new
+ mes = Fluent::MultiEventStream.new
record.each do |single_record|
if @add_http_headers
params.each_pair { |k,v|
if k.start_with?("HTTP_")
single_record[k] = v
@@ -213,43 +215,44 @@
end
private
def parse_params_default(params)
- record = if msgpack = params['msgpack']
- Engine.msgpack_factory.unpacker.feed(msgpack).read
- elsif js = params['json']
- JSON.parse(js)
- else
- raise "'json' or 'msgpack' parameter is required"
- end
- return nil, record
+ if msgpack = params['msgpack']
+ @parser_msgpack.parse(msgpack) do |_time, record|
+ return nil, record
+ end
+ elsif js = params['json']
+ @parser_json.parse(js) do |_time, record|
+ return nil, record
+ end
+ else
+ raise "'json' or 'msgpack' parameter is required"
+ end
end
- EVENT_RECORD_PARAMETER = '_event_record'
-
def parse_params_with_parser(params)
if content = params[EVENT_RECORD_PARAMETER]
@parser.parse(content) { |time, record|
- raise "Received event is not #{@format}: #{content}" if record.nil?
+ raise "Received event is not #{@format_name}: #{content}" if record.nil?
return time, record
}
else
raise "'#{EVENT_RECORD_PARAMETER}' parameter is required"
end
end
class Handler < Coolio::Socket
attr_reader :content_type
- def initialize(io, km, callback, body_size_limit, format, log, cors_allow_origins)
+ def initialize(io, km, callback, body_size_limit, format_name, log, cors_allow_origins)
super(io)
@km = km
@callback = callback
@body_size_limit = body_size_limit
@next_close = false
- @format = format
+ @format_name = format_name
@log = log
@cors_allow_origins = cors_allow_origins
@idle = 0
@km.add(self)
@@ -353,10 +356,10 @@
@env['REMOTE_ADDR'] = @remote_addr if @remote_addr
uri = URI.parse(@parser.request_url)
params = WEBrick::HTTPUtils.parse_query(uri.query)
- if @format != 'default'
+ if @format_name != 'default'
params[EVENT_RECORD_PARAMETER] = @body
elsif @content_type =~ /^application\/x-www-form-urlencoded/
params.update WEBrick::HTTPUtils.parse_query(@body)
elsif @content_type =~ /^multipart\/form-data; boundary=(.+)/
boundary = WEBrick::HTTPUtils.dequote($1)