lib/fluent/plugin/in_tail.rb in fluentd-0.12.28 vs lib/fluent/plugin/in_tail.rb in fluentd-0.12.29

- old
+ new

@@ -48,18 +48,14 @@ config_param :read_lines_limit, :integer, default: 1000 desc 'The interval of flushing the buffer for multiline format' config_param :multiline_flush_interval, :time, default: nil desc 'Enable the additional watch timer.' config_param :enable_watch_timer, :bool, default: true + desc 'The encoding after conversion of the input.' + config_param :encoding, :string, default: nil desc 'The encoding of the input.' - config_param :encoding, default: nil do |encoding_name| - begin - Encoding.find(encoding_name) - rescue ArgumentError => e - raise ConfigError, e.message - end - end + config_param :from_encoding, :string, default: nil desc 'Add the log path being tailed to records. Specify the field name to be used.' config_param :path_key, :string, default: nil attr_reader :paths @@ -76,10 +72,11 @@ $log.warn "this parameter is highly recommended to save the position to resume tailing." end configure_parser(conf) configure_tag + configure_encoding @multiline_mode = conf['format'] =~ /multiline/ @receive_handler = if @multiline_mode method(:parse_multilines) else @@ -100,10 +97,29 @@ @tag_prefix = nil @tag_suffix = nil end end + def configure_encoding + unless @encoding + if @from_encoding + raise ConfigError, "tail: 'from_encoding' parameter must be specified with 'encoding' parameter." + end + end + + @encoding = parse_encoding_param(@encoding) if @encoding + @from_encoding = parse_encoding_param(@from_encoding) if @from_encoding + end + + def parse_encoding_param(encoding_name) + begin + Encoding.find(encoding_name) if encoding_name + rescue ArgumentError => e + raise ConfigError, e.message + end + end + def start if @pos_file @pf_file = File.open(@pos_file, File::RDWR|File::CREAT, DEFAULT_FILE_PERMISSION) @pf_file.sync = true @pf = PositionFile.parse(@pf_file) @@ -230,11 +246,17 @@ end def flush_buffer(tw) if lb = tw.line_buffer lb.chomp! - lb.force_encoding(@encoding) if @encoding + if @encoding + if @from_encoding + lb.encode!(@encoding, @from_encoding) + else + lb.force_encoding(@encoding) + end + end @parser.parse(lb) { |time, record| if time && record tag = if @tag_prefix || @tag_suffix @tag_prefix + tw.tag + @tag_suffix else @@ -279,10 +301,16 @@ end def convert_line_to_event(line, es, tail_watcher) begin line.chomp! # remove \n - line.force_encoding(@encoding) if @encoding + if @encoding + if @from_encoding + line.encode!(@encoding, @from_encoding) + else + line.force_encoding(@encoding) + end + end @parser.parse(line) { |time, record| if time && record record[@path_key] ||= tail_watcher.path unless @path_key.nil? es.add(time, record) else