# # Fluentd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # require 'fluent/plugin/output' require 'fluent/env' require 'fluent/config/error' require 'yajl' module Fluent::Plugin class ExecFilterOutput < Output Fluent::Plugin.register_output('exec_filter', self) helpers :compat_parameters, :inject, :formatter, :parser, :extract, :child_process, :event_emitter desc 'The command (program) to execute.' config_param :command, :string config_param :remove_prefix, :string, default: nil, deprecated: "use @label instead for event routing" config_param :add_prefix, :string, default: nil, deprecated: "use @label instead for event routing" config_section :inject do config_set_default :time_type, :unixtime end config_section :format do config_set_default :@type, 'tsv' config_set_default :localtime, true end config_section :parse do config_set_default :@type, 'tsv' config_set_default :time_key, nil config_set_default :time_format, nil config_set_default :localtime, true config_set_default :estimate_current_event, false end config_section :extract do config_set_default :time_type, :float end config_section :buffer do config_set_default :flush_mode, :interval config_set_default :flush_interval, 1 end config_param :tag, :string, default: nil config_param :tag_key, :string, default: nil, deprecated: "use 'tag_key' in / instead" config_param :time_key, :string, default: nil, deprecated: "use 'time_key' in / instead" config_param :time_format, :string, default: nil, deprecated: "use 'time_format' in / instead" desc 'The default block size to read if parser requires partial read.' config_param :read_block_size, :size, default: 10240 # 10k desc 'The number of spawned process for command.' config_param :num_children, :integer, default: 1 desc 'Respawn command when command exit. ["none", "inf" or positive integer for times to respawn (default: none)]' # nil, 'none' or 0: no respawn, 'inf' or -1: infinite times, positive integer: try to respawn specified times only config_param :child_respawn, :string, default: nil # 0: output logs for all of messages to emit config_param :suppress_error_log_interval, :time, default: 0 attr_reader :formatter, :parser # for tests KEYS_FOR_IN_AND_OUT = { 'tag_key' => ['in_tag_key', 'out_tag_key'], 'time_key' => ['in_time_key', 'out_time_key'], 'time_format' => ['in_time_format', 'out_time_format'], } COMPAT_INJECT_PARAMS = { 'in_tag_key' => 'tag_key', 'in_time_key' => 'time_key', 'in_time_format' => 'time_format', } COMPAT_FORMAT_PARAMS = { 'in_format' => '@type', 'in_keys' => 'keys', } COMPAT_PARSE_PARAMS = { 'out_format' => '@type', 'out_keys' => 'keys', 'out_stream_buffer_size' => 'stream_buffer_size', } COMPAT_EXTRACT_PARAMS = { 'out_tag_key' => 'tag_key', 'out_time_key' => 'time_key', 'out_time_format' => 'time_format', } def exec_filter_compat_parameters_copy_to_subsection!(conf, subsection_name, params) return unless conf.elements(subsection_name).empty? return unless params.keys.any?{|k| conf.has_key?(k) } hash = {} params.each_pair do |compat, current| hash[current] = conf[compat] if conf.has_key?(compat) end conf.elements << Fluent::Config::Element.new(subsection_name, '', hash, []) end def exec_filter_compat_parameters_convert!(conf) KEYS_FOR_IN_AND_OUT.each_pair do |inout, keys| if conf.has_key?(inout) keys.each do |k| conf[k] = conf[inout] end end end exec_filter_compat_parameters_copy_to_subsection!(conf, 'inject', COMPAT_INJECT_PARAMS) exec_filter_compat_parameters_copy_to_subsection!(conf, 'format', COMPAT_FORMAT_PARAMS) exec_filter_compat_parameters_copy_to_subsection!(conf, 'parse', COMPAT_PARSE_PARAMS) exec_filter_compat_parameters_copy_to_subsection!(conf, 'extract', COMPAT_EXTRACT_PARAMS) end def configure(conf) exec_filter_compat_parameters_convert!(conf) compat_parameters_convert(conf, :buffer) if inject_section = conf.elements('inject').first if inject_section.has_key?('time_format') inject_section['time_type'] ||= 'string' end end if extract_section = conf.elements('extract').first if extract_section.has_key?('time_format') extract_section['time_type'] ||= 'string' end end super if !@tag && (!@extract_config || !@extract_config.tag_key) raise Fluent::ConfigError, "'tag' or ' tag_key ' option is required on exec_filter output" end @formatter = formatter_create @parser = parser_create if @remove_prefix @removed_prefix_string = @remove_prefix + '.' @removed_length = @removed_prefix_string.length end if @add_prefix @added_prefix_string = @add_prefix + '.' end @respawns = if @child_respawn.nil? || (@child_respawn == 'none') || (@child_respawn == '0') 0 elsif (@child_respawn == 'inf') || (@child_respawn == '-1') -1 elsif /^\d+$/.match?(@child_respawn) @child_respawn.to_i else raise ConfigError, "child_respawn option argument invalid: none(or 0), inf(or -1) or positive number" end @suppress_error_log_interval ||= 0 @next_log_time = Time.now.to_i end def multi_workers_ready? true end ExecutedProcess = Struct.new(:mutex, :pid, :respawns, :readio, :writeio) def start super @children_mutex = Mutex.new @children = [] @rr = 0 exit_callback = ->(status){ c = @children.find{|child| child.pid == status.pid } if c unless self.stopped? log.warn "child process exits with error code", code: status.to_i, status: status.exitstatus, signal: status.termsig end c.mutex.synchronize do (c.writeio && c.writeio.close) rescue nil (c.readio && c.readio.close) rescue nil c.pid = c.readio = c.writeio = nil end end } child_process_callback = ->(index, readio, writeio){ pid = child_process_id c = @children[index] writeio.sync = true c.mutex.synchronize do c.pid = pid c.respawns = @respawns c.readio = readio c.writeio = writeio end run(readio) } execute_child_process = ->(index){ child_process_execute("out_exec_filter_child#{index}".to_sym, @command, on_exit_callback: exit_callback) do |readio, writeio| child_process_callback.call(index, readio, writeio) end } @children_mutex.synchronize do @num_children.times do |i| @children << ExecutedProcess.new(Mutex.new, nil, 0, nil, nil) execute_child_process.call(i) end end if @respawns != 0 thread_create(:out_exec_filter_respawn_monitor) do while thread_current_running? @children.each_with_index do |c, i| if c.mutex && c.mutex.synchronize{ c.pid.nil? && c.respawns != 0 } respawns = c.mutex.synchronize do c.respawns -= 1 if c.respawns > 0 c.respawns end log.info "respawning child process", num: i, respawn_counter: respawns execute_child_process.call(i) end end sleep 0.2 end end end end def terminate @children = [] super end def tag_remove_prefix(tag) if @remove_prefix if ((tag[0, @removed_length] == @removed_prefix_string) && (tag.length > @removed_length)) || (tag == @removed_prefix_string) tag = tag[@removed_length..-1] || '' end end tag end NEWLINE = "\n" def format(tag, time, record) tag = tag_remove_prefix(tag) record = inject_values_to_record(tag, time, record) if @formatter.formatter_type == :text_per_line @formatter.format(tag, time, record).chomp + NEWLINE else @formatter.format(tag, time, record) end end def write(chunk) try_times = 0 while true r = @rr = (@rr + 1) % @children.length if @children[r].pid && writeio = @children[r].writeio chunk.write_to(writeio) break end try_times += 1 raise "no healthy child processes exist" if try_times >= @children.length end end def run(io) io.set_encoding(Encoding::ASCII_8BIT) case when @parser.implement?(:parse_io) @parser.parse_io(io, &method(:on_record)) when @parser.implement?(:parse_partial_data) until io.eof? @parser.parse_partial_data(io.readpartial(@read_block_size), &method(:on_record)) end when @parser.parser_type == :text_per_line io.each_line do |line| @parser.parse(line.chomp, &method(:on_record)) end else @parser.parse(io.read, &method(:on_record)) end end def on_record(time, record) tag = extract_tag_from_record(record) tag = @added_prefix_string + tag if tag && @add_prefix tag ||= @tag time ||= extract_time_from_record(record) || Fluent::EventTime.now router.emit(tag, time, record) rescue => e if @suppress_error_log_interval == 0 || Time.now.to_i > @next_log_time log.error "exec_filter failed to emit", record: Yajl.dump(record), error: e log.error_backtrace e.backtrace @next_log_time = Time.now.to_i + @suppress_error_log_interval end router.emit_error_event(tag, time, record, e) if tag && time && record end end end