# # Fluentd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # require 'fileutils' require 'open3' require 'pathname' require 'fluent/config' require 'fluent/counter' require 'fluent/env' require 'fluent/engine' require 'fluent/error' require 'fluent/log' require 'fluent/plugin' require 'fluent/rpc' require 'fluent/system_config' require 'fluent/msgpack_factory' require 'fluent/variable_store' require 'serverengine' if Fluent.windows? require 'win32/ipc' require 'win32/event' end module Fluent module ServerModule def before_run @fluentd_conf = config[:fluentd_conf] @rpc_endpoint = nil @rpc_server = nil @counter = nil @fluentd_lock_dir = Dir.mktmpdir("fluentd-lock-") ENV['FLUENTD_LOCK_DIR'] = @fluentd_lock_dir if config[:rpc_endpoint] @rpc_endpoint = config[:rpc_endpoint] @enable_get_dump = config[:enable_get_dump] run_rpc_server end if Fluent.windows? install_windows_event_handler else install_supervisor_signal_handlers end if counter = config[:counter_server] run_counter_server(counter) end if config[:disable_shared_socket] $log.info "shared socket for multiple workers is disabled" else server = ServerEngine::SocketManager::Server.open ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = server.path.to_s end end def after_run stop_windows_event_thread if Fluent.windows? stop_rpc_server if @rpc_endpoint stop_counter_server if @counter cleanup_lock_dir Fluent::Supervisor.cleanup_resources end def cleanup_lock_dir FileUtils.rm(Dir.glob(File.join(@fluentd_lock_dir, "fluentd-*.lock"))) FileUtils.rmdir(@fluentd_lock_dir) end def run_rpc_server @rpc_server = RPC::Server.new(@rpc_endpoint, $log) # built-in RPC for signals @rpc_server.mount_proc('/api/processes.interruptWorkers') { |req, res| $log.debug "fluentd RPC got /api/processes.interruptWorkers request" Process.kill :INT, Process.pid nil } @rpc_server.mount_proc('/api/processes.killWorkers') { |req, res| $log.debug "fluentd RPC got /api/processes.killWorkers request" Process.kill :TERM, Process.pid nil } @rpc_server.mount_proc('/api/processes.flushBuffersAndKillWorkers') { |req, res| $log.debug "fluentd RPC got /api/processes.flushBuffersAndKillWorkers request" if Fluent.windows? supervisor_sigusr1_handler stop(true) else Process.kill :USR1, Process.pid Process.kill :TERM, Process.pid end nil } @rpc_server.mount_proc('/api/plugins.flushBuffers') { |req, res| $log.debug "fluentd RPC got /api/plugins.flushBuffers request" if Fluent.windows? supervisor_sigusr1_handler else Process.kill :USR1, Process.pid end nil } @rpc_server.mount_proc('/api/config.reload') { |req, res| $log.debug "fluentd RPC got /api/config.reload request" if Fluent.windows? # restart worker with auto restarting by killing kill_worker else Process.kill :HUP, Process.pid end nil } @rpc_server.mount_proc('/api/config.dump') { |req, res| $log.debug "fluentd RPC got /api/config.dump request" $log.info "dump in-memory config" supervisor_dump_config_handler nil } @rpc_server.mount_proc('/api/config.gracefulReload') { |req, res| $log.debug "fluentd RPC got /api/config.gracefulReload request" if Fluent.windows? supervisor_sigusr2_handler else Process.kill :USR2, Process.pid end nil } @rpc_server.mount_proc('/api/config.getDump') { |req, res| $log.debug "fluentd RPC got /api/config.getDump request" $log.info "get dump in-memory config via HTTP" res.body = supervisor_get_dump_config_handler [nil, nil, res] } if @enable_get_dump @rpc_server.start end def stop_rpc_server @rpc_server.shutdown end def run_counter_server(counter_conf) @counter = Fluent::Counter::Server.new( counter_conf.scope, {host: counter_conf.bind, port: counter_conf.port, log: $log, path: counter_conf.backup_path} ) @counter.start end def stop_counter_server @counter.stop end def install_supervisor_signal_handlers return if Fluent.windows? trap :HUP do $log.debug "fluentd supervisor process get SIGHUP" supervisor_sighup_handler end trap :USR1 do $log.debug "fluentd supervisor process get SIGUSR1" supervisor_sigusr1_handler end trap :USR2 do $log.debug 'fluentd supervisor process got SIGUSR2' supervisor_sigusr2_handler end end if Fluent.windows? # Override some methods of ServerEngine::MultiSpawnWorker # Since Fluentd's Supervisor doesn't use ServerEngine's HUP, USR1 and USR2 # handlers (see install_supervisor_signal_handlers), they should be # disabled also on Windows, just send commands to workers instead. def restart(graceful) @monitors.each do |m| m.send_command(graceful ? "GRACEFUL_RESTART\n" : "IMMEDIATE_RESTART\n") end end def reload @monitors.each do |m| m.send_command("RELOAD\n") end end end def install_windows_event_handler return unless Fluent.windows? @pid_signame = "fluentd_#{Process.pid}" @signame = config[:signame] Thread.new do ipc = Win32::Ipc.new(nil) events = [ {win32_event: Win32::Event.new("#{@pid_signame}_STOP_EVENT_THREAD"), action: :stop_event_thread}, {win32_event: Win32::Event.new("#{@pid_signame}"), action: :stop}, {win32_event: Win32::Event.new("#{@pid_signame}_HUP"), action: :hup}, {win32_event: Win32::Event.new("#{@pid_signame}_USR1"), action: :usr1}, {win32_event: Win32::Event.new("#{@pid_signame}_USR2"), action: :usr2}, {win32_event: Win32::Event.new("#{@pid_signame}_CONT"), action: :cont}, ] if @signame signame_events = [ {win32_event: Win32::Event.new("#{@signame}"), action: :stop}, {win32_event: Win32::Event.new("#{@signame}_HUP"), action: :hup}, {win32_event: Win32::Event.new("#{@signame}_USR1"), action: :usr1}, {win32_event: Win32::Event.new("#{@signame}_USR2"), action: :usr2}, {win32_event: Win32::Event.new("#{@signame}_CONT"), action: :cont}, ] events.concat(signame_events) end begin loop do infinite = 0xFFFFFFFF ipc_idx = ipc.wait_any(events.map {|e| e[:win32_event]}, infinite) event_idx = ipc_idx - 1 if event_idx >= 0 && event_idx < events.length $log.debug("Got Win32 event \"#{events[event_idx][:win32_event].name}\"") else $log.warn("Unexpected return value of Win32::Ipc#wait_any: #{ipc_idx}") end case events[event_idx][:action] when :stop stop(true) when :hup supervisor_sighup_handler when :usr1 supervisor_sigusr1_handler when :usr2 supervisor_sigusr2_handler when :cont supervisor_dump_handler_for_windows when :stop_event_thread break end end ensure events.each { |event| event[:win32_event].close } end end end def stop_windows_event_thread if Fluent.windows? ev = Win32::Event.open("#{@pid_signame}_STOP_EVENT_THREAD") ev.set ev.close end end def supervisor_sighup_handler kill_worker end def supervisor_sigusr1_handler reopen_log send_signal_to_workers(:USR1) end def supervisor_sigusr2_handler conf = nil t = Thread.new do $log.info 'Reloading new config' # Validate that loading config is valid at first conf = Fluent::Config.build( config_path: config[:config_path], encoding: config[:conf_encoding], additional_config: config[:inline_config], use_v1_config: config[:use_v1_config], ) Fluent::VariableStore.try_to_reset do Fluent::Engine.reload_config(conf, supervisor: true) end end t.report_on_exception = false # Error is handled by myself t.join reopen_log send_signal_to_workers(:USR2) @fluentd_conf = conf.to_s rescue => e $log.error "Failed to reload config file: #{e}" end def supervisor_dump_handler_for_windows # As for UNIX-like, SIGCONT signal to each process makes the process output its dump-file, # and it is implemented before the implementation of the function for Windows. # It is possible to trap SIGCONT and handle it here also on UNIX-like, # but for backward compatibility, this handler is currently for a Windows-only. raise "[BUG] This function is for Windows ONLY." unless Fluent.windows? Thread.new do begin FluentSigdump.dump_windows rescue => e $log.error "failed to dump: #{e}" end end send_signal_to_workers(:CONT) rescue => e $log.error "failed to dump: #{e}" end def kill_worker if config[:worker_pid] pids = config[:worker_pid].clone config[:worker_pid].clear pids.each_value do |pid| if Fluent.windows? Process.kill :KILL, pid else Process.kill :TERM, pid end end end end def supervisor_dump_config_handler $log.info @fluentd_conf end def supervisor_get_dump_config_handler { conf: @fluentd_conf } end def dump super unless @stop end private def reopen_log if $log # Creating new thread due to mutex can't lock # in main thread during trap context Thread.new do $log.reopen! end end end def send_signal_to_workers(signal) return unless config[:worker_pid] if Fluent.windows? send_command_to_workers(signal) else config[:worker_pid].each_value do |pid| # don't rescue Errno::ESRCH here (invalid status) Process.kill(signal, pid) end end end def send_command_to_workers(signal) # Use SeverEngine's CommandSender on Windows case signal when :HUP restart(false) when :USR1 restart(true) when :USR2 reload when :CONT dump_all_windows_workers end end def dump_all_windows_workers @monitors.each do |m| m.send_command("DUMP\n") end end end module WorkerModule def spawn(process_manager) main_cmd = config[:main_cmd] env = { 'SERVERENGINE_WORKER_ID' => @worker_id.to_i.to_s, } @pm = process_manager.spawn(env, *main_cmd) end def after_start (config[:worker_pid] ||= {})[@worker_id] = @pm.pid end def dump super unless @stop end end class Supervisor def self.serverengine_config(params = {}) # ServerEngine's "daemonize" option is boolean, and path of pid file is brought by "pid_path" pid_path = params['daemonize'] daemonize = !!params['daemonize'] se_config = { worker_type: 'spawn', workers: params['workers'], log_stdin: false, log_stdout: false, log_stderr: false, enable_heartbeat: true, auto_heartbeat: false, unrecoverable_exit_codes: [2], stop_immediately_at_unrecoverable_exit: true, root_dir: params['root_dir'], logger: $log, log: $log.out, log_level: params['log_level'], chuser: params['chuser'], chgroup: params['chgroup'], chumask: params['chumask'], daemonize: daemonize, rpc_endpoint: params['rpc_endpoint'], counter_server: params['counter_server'], enable_get_dump: params['enable_get_dump'], windows_daemon_cmdline: [ServerEngine.ruby_bin_path, File.join(File.dirname(__FILE__), 'daemon.rb'), ServerModule.name, WorkerModule.name, JSON.dump(params)], command_sender: Fluent.windows? ? "pipe" : "signal", config_path: params['fluentd_conf_path'], fluentd_conf: params['fluentd_conf'], conf_encoding: params['conf_encoding'], inline_config: params['inline_config'], main_cmd: params['main_cmd'], signame: params['signame'], disable_shared_socket: params['disable_shared_socket'], restart_worker_interval: params['restart_worker_interval'], } se_config[:pid_path] = pid_path if daemonize se_config end def self.default_options { config_path: Fluent::DEFAULT_CONFIG_PATH, plugin_dirs: [Fluent::DEFAULT_PLUGIN_DIR], log_level: Fluent::Log::LEVEL_INFO, log_path: nil, daemonize: nil, libs: [], setup_path: nil, chuser: nil, chgroup: nil, chumask: "0", root_dir: nil, suppress_interval: 0, suppress_repeated_stacktrace: true, ignore_repeated_log_interval: nil, without_source: nil, enable_input_metrics: nil, enable_size_metrics: nil, use_v1_config: true, strict_config_value: nil, supervise: true, standalone_worker: false, signame: nil, conf_encoding: 'utf-8', disable_shared_socket: nil, config_file_type: :guess, } end def self.cleanup_resources unless Fluent.windows? if ENV.has_key?('SERVERENGINE_SOCKETMANAGER_PATH') FileUtils.rm_f(ENV['SERVERENGINE_SOCKETMANAGER_PATH']) end end end def initialize(cl_opt) @cl_opt = cl_opt opt = self.class.default_options.merge(cl_opt) @config_file_type = opt[:config_file_type] @daemonize = opt[:daemonize] @standalone_worker= opt[:standalone_worker] @config_path = opt[:config_path] @inline_config = opt[:inline_config] @use_v1_config = opt[:use_v1_config] @conf_encoding = opt[:conf_encoding] @log_path = opt[:log_path] @show_plugin_config = opt[:show_plugin_config] @libs = opt[:libs] @plugin_dirs = opt[:plugin_dirs] @chgroup = opt[:chgroup] @chuser = opt[:chuser] @chumask = opt[:chumask] @signame = opt[:signame] # TODO: `@log_rotate_age` and `@log_rotate_size` should be removed # since it should be merged with SystemConfig in `build_system_config()`. # We should always use `system_config.log.rotate_age` and `system_config.log.rotate_size`. # However, currently, there is a bug that `system_config.log` parameters # are not in `Fluent::SystemConfig::SYSTEM_CONFIG_PARAMETERS`, and these # parameters are not merged in `build_system_config()`. # Until we fix the bug of `Fluent::SystemConfig`, we need to use these instance variables. @log_rotate_age = opt[:log_rotate_age] @log_rotate_size = opt[:log_rotate_size] @finished = false end def run_supervisor(dry_run: false) if dry_run $log.info "starting fluentd-#{Fluent::VERSION} as dry run mode", ruby: RUBY_VERSION end if @system_config.workers < 1 raise Fluent::ConfigError, "invalid number of workers (must be > 0):#{@system_config.workers}" end root_dir = @system_config.root_dir if root_dir if File.exist?(root_dir) unless Dir.exist?(root_dir) raise Fluent::InvalidRootDirectory, "non directory entry exists:#{root_dir}" end else begin FileUtils.mkdir_p(root_dir, mode: @system_config.dir_permission || Fluent::DEFAULT_DIR_PERMISSION) rescue => e raise Fluent::InvalidRootDirectory, "failed to create root directory:#{root_dir}, #{e.inspect}" end end end begin ServerEngine::Privilege.change(@chuser, @chgroup) MessagePackFactory.init(enable_time_support: @system_config.enable_msgpack_time_support) Fluent::Engine.init(@system_config, supervisor_mode: true) Fluent::Engine.run_configure(@conf, dry_run: dry_run) rescue Fluent::ConfigError => e $log.error 'config error', file: @config_path, error: e $log.debug_backtrace exit!(1) end if dry_run $log.info 'finished dry run mode' exit 0 else supervise end end def options { 'config_path' => @config_path, 'pid_file' => @daemonize, 'plugin_dirs' => @plugin_dirs, 'log_path' => @log_path, 'root_dir' => @system_config.root_dir, } end def run_worker Process.setproctitle("worker:#{@system_config.process_name}") if @process_name if @standalone_worker && @system_config.workers != 1 raise Fluent::ConfigError, "invalid number of workers (must be 1 or unspecified) with --no-supervisor: #{@system_config.workers}" end install_main_process_signal_handlers # This is the only log messsage for @standalone_worker $log.info "starting fluentd-#{Fluent::VERSION} without supervision", pid: Process.pid, ruby: RUBY_VERSION if @standalone_worker main_process do create_socket_manager if @standalone_worker if @standalone_worker ServerEngine::Privilege.change(@chuser, @chgroup) File.umask(@chumask.to_i(8)) end MessagePackFactory.init(enable_time_support: @system_config.enable_msgpack_time_support) Fluent::Engine.init(@system_config) Fluent::Engine.run_configure(@conf) Fluent::Engine.run self.class.cleanup_resources if @standalone_worker exit 0 end end def configure(supervisor: false) setup_global_logger(supervisor: supervisor) if @show_plugin_config show_plugin_config end if @inline_config == '-' $log.warn('the value "-" for `inline_config` is deprecated. See https://github.com/fluent/fluentd/issues/2711') @inline_config = STDIN.read end @conf = Fluent::Config.build( config_path: @config_path, encoding: @conf_encoding, additional_config: @inline_config, use_v1_config: @use_v1_config, type: @config_file_type, ) @system_config = build_system_config(@conf) $log.info :supervisor, 'parsing config file is succeeded', path: @config_path @libs.each do |lib| require lib end @plugin_dirs.each do |dir| if Dir.exist?(dir) dir = File.expand_path(dir) Fluent::Plugin.add_plugin_dir(dir) end end if supervisor # plugins / configuration dumps Gem::Specification.find_all.select { |x| x.name =~ /^fluent(d|-(plugin|mixin)-.*)$/ }.each do |spec| $log.info("gem '#{spec.name}' version '#{spec.version}'") end end end private def setup_global_logger(supervisor: false) if supervisor worker_id = 0 process_type = :supervisor else worker_id = ENV['SERVERENGINE_WORKER_ID'].to_i process_type = case when @standalone_worker then :standalone when worker_id == 0 then :worker0 else :workers end end # Parse configuration immediately to initialize logger in early stage. # Since we can't confirm the log messages in this parsing process, # we must parse the config again after initializing logger. conf = Fluent::Config.build( config_path: @config_path, encoding: @conf_encoding, additional_config: @inline_config, use_v1_config: @use_v1_config, type: @config_file_type, ) system_config = build_system_config(conf) # TODO: we should remove this logic. This merging process should be done # in `build_system_config()`. @log_rotate_age ||= system_config.log.rotate_age @log_rotate_size ||= system_config.log.rotate_size rotate = @log_rotate_age || @log_rotate_size actual_log_path = @log_path # We need to prepare a unique path for each worker since Windows locks files. if Fluent.windows? && rotate && @log_path && @log_path != "-" actual_log_path = Fluent::Log.per_process_path(@log_path, process_type, worker_id) end if actual_log_path && actual_log_path != "-" FileUtils.mkdir_p(File.dirname(actual_log_path)) unless File.exist?(actual_log_path) if rotate logdev = Fluent::LogDeviceIO.new( actual_log_path, shift_age: @log_rotate_age, shift_size: @log_rotate_size, ) else logdev = File.open(actual_log_path, "a") end if @chuser || @chgroup chuid = @chuser ? ServerEngine::Privilege.get_etc_passwd(@chuser).uid : nil chgid = @chgroup ? ServerEngine::Privilege.get_etc_group(@chgroup).gid : nil File.chown(chuid, chgid, actual_log_path) end if system_config.dir_permission File.chmod(system_config.dir_permission || Fluent::DEFAULT_DIR_PERMISSION, File.dirname(actual_log_path)) end else logdev = STDOUT end $log = Fluent::Log.new( # log_level: subtract 1 to match serverengine daemon logger side logging severity. ServerEngine::DaemonLogger.new(logdev, log_level: system_config.log_level - 1), path: actual_log_path, process_type: process_type, worker_id: worker_id, format: system_config.log.format, time_format: system_config.log.time_format, suppress_repeated_stacktrace: system_config.suppress_repeated_stacktrace, ignore_repeated_log_interval: system_config.ignore_repeated_log_interval, ignore_same_log_interval: system_config.ignore_same_log_interval, ) $log.enable_color(false) if actual_log_path $log.enable_debug if system_config.log_level <= Fluent::Log::LEVEL_DEBUG $log.info "init #{process_type} logger", path: actual_log_path, rotate_age: @log_rotate_age, rotate_size: @log_rotate_size end def create_socket_manager server = ServerEngine::SocketManager::Server.open ENV['SERVERENGINE_SOCKETMANAGER_PATH'] = server.path.to_s end def show_plugin_config name, type = @show_plugin_config.split(":") # input:tail $log.info "show_plugin_config option is deprecated. Use fluent-plugin-config-format --format=txt #{name} #{type}" exit 0 end def supervise Process.setproctitle("supervisor:#{@system_config.process_name}") if @system_config.process_name $log.info "starting fluentd-#{Fluent::VERSION}", pid: Process.pid, ruby: RUBY_VERSION fluentd_spawn_cmd = build_spawn_command $log.info "spawn command to main: ", cmdline: fluentd_spawn_cmd params = { 'main_cmd' => fluentd_spawn_cmd, 'daemonize' => @daemonize, 'inline_config' => @inline_config, 'chuser' => @chuser, 'chgroup' => @chgroup, 'fluentd_conf_path' => @config_path, 'fluentd_conf' => @conf.to_s, 'use_v1_config' => @use_v1_config, 'conf_encoding' => @conf_encoding, 'signame' => @signame, 'workers' => @system_config.workers, 'root_dir' => @system_config.root_dir, 'log_level' => @system_config.log_level, 'rpc_endpoint' => @system_config.rpc_endpoint, 'enable_get_dump' => @system_config.enable_get_dump, 'counter_server' => @system_config.counter_server, 'disable_shared_socket' => @system_config.disable_shared_socket, 'restart_worker_interval' => @system_config.restart_worker_interval, } se = ServerEngine.create(ServerModule, WorkerModule) { # Note: This is called only at the initialization of ServerEngine, since # Fluentd overwrites all related SIGNAL(HUP,USR1,USR2) and have own reloading feature. Fluent::Supervisor.serverengine_config(params) } se.run end def install_main_process_signal_handlers # Fluentd worker process (worker of ServerEngine) don't use code in serverengine to set signal handlers, # because it does almost nothing. # This method is the only method to set signal handlers in Fluentd worker process. # When user use Ctrl + C not SIGINT, SIGINT is sent to all process in same process group. # ServerEngine server process will send SIGTERM to child(spawned) processes by that SIGINT, so # worker process SHOULD NOT do anything with SIGINT, SHOULD just ignore. trap :INT do $log.debug "fluentd main process get SIGINT" # When Fluentd is launched without supervisor, worker should handle ctrl-c by itself if @standalone_worker @finished = true $log.debug "getting start to shutdown main process" Fluent::Engine.stop end end trap :TERM do $log.debug "fluentd main process get SIGTERM" unless @finished @finished = true $log.debug "getting start to shutdown main process" Fluent::Engine.stop end end if Fluent.windows? install_main_process_command_handlers else trap :USR1 do flush_buffer end trap :USR2 do reload_config end trap :CONT do dump_non_windows end end end def install_main_process_command_handlers command_pipe = $stdin.dup $stdin.reopen(File::NULL, "rb") command_pipe.binmode command_pipe.sync = true Thread.new do loop do cmd = command_pipe.gets break unless cmd case cmd.chomp! when "GRACEFUL_STOP", "IMMEDIATE_STOP" $log.debug "fluentd main process get #{cmd} command" @finished = true $log.debug "getting start to shutdown main process" Fluent::Engine.stop break when "GRACEFUL_RESTART" $log.debug "fluentd main process get #{cmd} command" flush_buffer when "RELOAD" $log.debug "fluentd main process get #{cmd} command" reload_config when "DUMP" $log.debug "fluentd main process get #{cmd} command" dump_windows else $log.warn "fluentd main process get unknown command [#{cmd}]" end end end end def flush_buffer # Creating new thread due to mutex can't lock # in main thread during trap context Thread.new do begin $log.debug "fluentd main process get SIGUSR1" $log.info "force flushing buffered events" $log.reopen! Fluent::Engine.flush! $log.debug "flushing thread: flushed" rescue Exception => e $log.warn "flushing thread error: #{e}" end end end def reload_config Thread.new do $log.debug('worker got SIGUSR2') begin conf = Fluent::Config.build( config_path: @config_path, encoding: @conf_encoding, additional_config: @inline_config, use_v1_config: @use_v1_config, type: @config_file_type, ) Fluent::VariableStore.try_to_reset do Fluent::Engine.reload_config(conf) end rescue => e # it is guaranteed that config file is valid by supervisor side. but it's not atomic because of using signals to commnicate between worker and super # So need this rescue code $log.error("failed to reload config: #{e}") next end @conf = conf end end def dump_non_windows begin Sigdump.dump unless @finished rescue => e $log.error("failed to dump: #{e}") end end def dump_windows Thread.new do begin FluentSigdump.dump_windows rescue => e $log.error("failed to dump: #{e}") end end end def logging_with_console_output yield $log unless $log.stdout? logger = ServerEngine::DaemonLogger.new(STDOUT) log = Fluent::Log.new(logger) log.level = @system_config.log_level console = log.enable_debug yield console end end def main_process(&block) if @system_config.process_name if @system_config.workers > 1 Process.setproctitle("worker:#{@system_config.process_name}#{ENV['SERVERENGINE_WORKER_ID']}") else Process.setproctitle("worker:#{@system_config.process_name}") end end unrecoverable_error = false begin block.call rescue Fluent::ConfigError => e logging_with_console_output do |log| log.error "config error", file: @config_path, error: e log.debug_backtrace end unrecoverable_error = true rescue Fluent::UnrecoverableError => e logging_with_console_output do |log| log.error e.message, error: e log.error_backtrace end unrecoverable_error = true rescue ScriptError => e # LoadError, NotImplementedError, SyntaxError logging_with_console_output do |log| if e.respond_to?(:path) log.error e.message, path: e.path, error: e else log.error e.message, error: e end log.error_backtrace end unrecoverable_error = true rescue => e logging_with_console_output do |log| log.error "unexpected error", error: e log.error_backtrace end end exit!(unrecoverable_error ? 2 : 1) end def build_system_config(conf) system_config = SystemConfig.create(conf, @cl_opt[:strict_config_value]) # Prefer the options explicitly specified in the command line # # TODO: There is a bug that `system_config.log.rotate_age/rotate_size` are # not merged with the command line options since they are not in # `SYSTEM_CONFIG_PARAMETERS`. # We have to fix this bug. opt = {} Fluent::SystemConfig::SYSTEM_CONFIG_PARAMETERS.each do |param| if @cl_opt.key?(param) && !@cl_opt[param].nil? opt[param] = @cl_opt[param] end end system_config.overwrite_variables(**opt) system_config end RUBY_ENCODING_OPTIONS_REGEX = %r{\A(-E|--encoding=|--internal-encoding=|--external-encoding=)}.freeze def build_spawn_command if ENV['TEST_RUBY_PATH'] fluentd_spawn_cmd = [ENV['TEST_RUBY_PATH']] else fluentd_spawn_cmd = [ServerEngine.ruby_bin_path] end rubyopt = ENV['RUBYOPT'] if rubyopt encodes, others = rubyopt.split(' ').partition { |e| e.match?(RUBY_ENCODING_OPTIONS_REGEX) } fluentd_spawn_cmd.concat(others) adopted_encodes = encodes.empty? ? ['-Eascii-8bit:ascii-8bit'] : encodes fluentd_spawn_cmd.concat(adopted_encodes) else fluentd_spawn_cmd << '-Eascii-8bit:ascii-8bit' end if @system_config.enable_jit $log.info "enable Ruby JIT for workers (--jit)" fluentd_spawn_cmd << '--jit' end # Adding `-h` so that it can avoid ruby's command blocking # e.g. `ruby -Eascii-8bit:ascii-8bit` will block. but `ruby -Eascii-8bit:ascii-8bit -h` won't. _, e, s = Open3.capture3(*fluentd_spawn_cmd, "-h") if s.exitstatus != 0 $log.error('Invalid option is passed to RUBYOPT', command: fluentd_spawn_cmd, error: e) exit s.exitstatus end fluentd_spawn_cmd << $0 fluentd_spawn_cmd += $fluentdargv fluentd_spawn_cmd << '--under-supervisor' fluentd_spawn_cmd end end module FluentSigdump def self.dump_windows raise "[BUG] WindowsSigdump::dump is for Windows ONLY." unless Fluent.windows? # Sigdump outputs under `/tmp` dir without `SIGDUMP_PATH` specified, # but `/tmp` dir may not exist on Windows by default. # So use the systemroot-temp-dir instead. dump_filepath = ENV['SIGDUMP_PATH'].nil? || ENV['SIGDUMP_PATH'].empty? \ ? "#{ENV['windir']}/Temp/fluentd-sigdump-#{Process.pid}.log" : get_path_with_pid(ENV['SIGDUMP_PATH']) require 'sigdump' Sigdump.dump(dump_filepath) $log.info "dump to #{dump_filepath}." end def self.get_path_with_pid(raw_path) path = Pathname.new(raw_path) path.sub_ext("-#{Process.pid}#{path.extname}").to_s end end end