lib/fluent/plugin/in_tail.rb in fluentd-0.12.28 vs lib/fluent/plugin/in_tail.rb in fluentd-0.12.29
- old
+ new
@@ -48,18 +48,14 @@
config_param :read_lines_limit, :integer, default: 1000
desc 'The interval of flushing the buffer for multiline format'
config_param :multiline_flush_interval, :time, default: nil
desc 'Enable the additional watch timer.'
config_param :enable_watch_timer, :bool, default: true
+ desc 'The encoding after conversion of the input.'
+ config_param :encoding, :string, default: nil
desc 'The encoding of the input.'
- config_param :encoding, default: nil do |encoding_name|
- begin
- Encoding.find(encoding_name)
- rescue ArgumentError => e
- raise ConfigError, e.message
- end
- end
+ config_param :from_encoding, :string, default: nil
desc 'Add the log path being tailed to records. Specify the field name to be used.'
config_param :path_key, :string, default: nil
attr_reader :paths
@@ -76,10 +72,11 @@
$log.warn "this parameter is highly recommended to save the position to resume tailing."
end
configure_parser(conf)
configure_tag
+ configure_encoding
@multiline_mode = conf['format'] =~ /multiline/
@receive_handler = if @multiline_mode
method(:parse_multilines)
else
@@ -100,10 +97,29 @@
@tag_prefix = nil
@tag_suffix = nil
end
end
+ def configure_encoding
+ unless @encoding
+ if @from_encoding
+ raise ConfigError, "tail: 'from_encoding' parameter must be specified with 'encoding' parameter."
+ end
+ end
+
+ @encoding = parse_encoding_param(@encoding) if @encoding
+ @from_encoding = parse_encoding_param(@from_encoding) if @from_encoding
+ end
+
+ def parse_encoding_param(encoding_name)
+ begin
+ Encoding.find(encoding_name) if encoding_name
+ rescue ArgumentError => e
+ raise ConfigError, e.message
+ end
+ end
+
def start
if @pos_file
@pf_file = File.open(@pos_file, File::RDWR|File::CREAT, DEFAULT_FILE_PERMISSION)
@pf_file.sync = true
@pf = PositionFile.parse(@pf_file)
@@ -230,11 +246,17 @@
end
def flush_buffer(tw)
if lb = tw.line_buffer
lb.chomp!
- lb.force_encoding(@encoding) if @encoding
+ if @encoding
+ if @from_encoding
+ lb.encode!(@encoding, @from_encoding)
+ else
+ lb.force_encoding(@encoding)
+ end
+ end
@parser.parse(lb) { |time, record|
if time && record
tag = if @tag_prefix || @tag_suffix
@tag_prefix + tw.tag + @tag_suffix
else
@@ -279,10 +301,16 @@
end
def convert_line_to_event(line, es, tail_watcher)
begin
line.chomp! # remove \n
- line.force_encoding(@encoding) if @encoding
+ if @encoding
+ if @from_encoding
+ line.encode!(@encoding, @from_encoding)
+ else
+ line.force_encoding(@encoding)
+ end
+ end
@parser.parse(line) { |time, record|
if time && record
record[@path_key] ||= tail_watcher.path unless @path_key.nil?
es.add(time, record)
else