# # Fluent # # Copyright (C) 2011 FURUHASHI Sadayuki # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # require 'fluent/env' require 'fluent/log' require 'fluent/config' require 'etc' module Fluent class Supervisor def self.get_etc_passwd(user) if user.to_i.to_s == user Etc.getpwuid(user.to_i) else Etc.getpwnam(user) end end def self.get_etc_group(group) if group.to_i.to_s == group Etc.getgrgid(group.to_i) else Etc.getgrnam(group) end end class LoggerInitializer def initialize(path, level, chuser, chgroup, opts) @path = path @level = level @chuser = chuser @chgroup = chgroup @opts = opts end def init if @path && @path != "-" @io = File.open(@path, "a") if @chuser || @chgroup chuid = @chuser ? Supervisor.get_etc_passwd(@chuser).uid : nil chgid = @chgroup ? Supervisor.get_etc_group(@chgroup).gid : nil File.chown(chuid, chgid, @path) end else @io = STDOUT end $log = Fluent::Log.new(@io, @level, @opts) $log.enable_color(false) if @path $log.enable_debug if @level <= Fluent::Log::LEVEL_DEBUG end def stdout? @io == STDOUT end def reopen! if @path && @path != "-" @io.reopen(@path, "a") end self end end def self.default_options { :config_path => Fluent::DEFAULT_CONFIG_PATH, :plugin_dirs => [Fluent::DEFAULT_PLUGIN_DIR], :log_level => Fluent::Log::LEVEL_INFO, :log_path => nil, :daemonize => nil, :libs => [], :setup_path => nil, :chuser => nil, :chgroup => nil, :suppress_interval => 0, :suppress_repeated_stacktrace => false, :use_v1_config => false, } end def initialize(opt) @daemonize = opt[:daemonize] @config_path = opt[:config_path] @inline_config = opt[:inline_config] @use_v1_config = opt[:use_v1_config] @log_path = opt[:log_path] @dry_run = opt[:dry_run] @libs = opt[:libs] @plugin_dirs = opt[:plugin_dirs] @chgroup = opt[:chgroup] @chuser = opt[:chuser] apply_system_config(opt) @log_level = opt[:log_level] @suppress_interval = opt[:suppress_interval] @suppress_config_dump = opt[:suppress_config_dump] log_opts = {:suppress_repeated_stacktrace => opt[:suppress_repeated_stacktrace]} @log = LoggerInitializer.new(@log_path, @log_level, @chuser, @chgroup, log_opts) @finished = false @main_pid = nil end def start require 'fluent/load' @log.init dry_run if @dry_run start_daemonize if @daemonize install_supervisor_signal_handlers until @finished supervise do read_config change_privilege init_engine install_main_process_signal_handlers run_configure finish_daemonize if @daemonize run_engine exit 0 end $log.error "fluentd main process died unexpectedly. restarting." unless @finished end end def options { 'config_path' => @config_path, 'pid_file' => @daemonize, 'plugin_dirs' => @plugin_dirs, 'log_path' => @log_path } end private def dry_run read_config change_privilege init_engine install_main_process_signal_handlers run_configure exit 0 rescue => e $log.error "Dry run failed: #{e}" exit 1 end def start_daemonize @wait_daemonize_pipe_r, @wait_daemonize_pipe_w = IO.pipe if fork # console process @wait_daemonize_pipe_w.close @wait_daemonize_pipe_w = nil wait_daemonize exit 0 end # daemonize intermediate process @wait_daemonize_pipe_r.close @wait_daemonize_pipe_r = nil # in case the child process forked during run_configure @wait_daemonize_pipe_w.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) Process.setsid exit!(0) if fork File.umask(0) # supervisor process @supervisor_pid = Process.pid end def wait_daemonize supervisor_pid = @wait_daemonize_pipe_r.read if supervisor_pid.empty? # initialization failed exit! 1 end @wait_daemonize_pipe_r.close @wait_daemonize_pipe_r = nil # write pid file File.open(@daemonize, "w") {|f| f.write supervisor_pid } end def finish_daemonize if @wait_daemonize_pipe_w STDIN.reopen("/dev/null") STDOUT.reopen("/dev/null", "w") STDERR.reopen("/dev/null", "w") @wait_daemonize_pipe_w.write @supervisor_pid.to_s @wait_daemonize_pipe_w.close @wait_daemonize_pipe_w = nil end end def supervise(&block) start_time = Time.now $log.info "starting fluentd-#{Fluent::VERSION}" @main_pid = fork do main_process(&block) end if @daemonize && @wait_daemonize_pipe_w STDIN.reopen("/dev/null") STDOUT.reopen("/dev/null", "w") STDERR.reopen("/dev/null", "w") @wait_daemonize_pipe_w.close @wait_daemonize_pipe_w = nil end Process.waitpid(@main_pid) @main_pid = nil ecode = $?.to_i $log.info "process finished", :code=>ecode if !@finished && Time.now - start_time < 1 $log.warn "process died within 1 second. exit." exit ecode end end def main_process(&block) begin block.call rescue Fluent::ConfigError $log.error "config error", :file=>@config_path, :error=>$!.to_s $log.debug_backtrace unless @log.stdout? console = Fluent::Log.new(STDOUT, @log_level).enable_debug console.error "config error", :file=>@config_path, :error=>$!.to_s console.debug_backtrace end rescue $log.error "unexpected error", :error=>$!.to_s $log.error_backtrace unless @log.stdout? console = Fluent::Log.new(STDOUT, @log_level).enable_debug console.error "unexpected error", :error=>$!.to_s console.error_backtrace end end exit! 1 end def install_supervisor_signal_handlers trap :INT do $log.debug "fluentd supervisor process get SIGINT" @finished = true if pid = @main_pid # kill processes only still exists unless Process.waitpid(pid, Process::WNOHANG) begin Process.kill(:INT, pid) rescue Errno::ESRCH # ignore processes already died end end end end trap :TERM do $log.debug "fluentd supervisor process get SIGTERM" @finished = true if pid = @main_pid # kill processes only still exists unless Process.waitpid(pid, Process::WNOHANG) begin Process.kill(:TERM, pid) rescue Errno::ESRCH # ignore processes already died end end end end trap :HUP do $log.debug "fluentd supervisor process get SIGHUP" $log.info "restarting" if pid = @main_pid Process.kill(:TERM, pid) # don't resuce Erro::ESRSH here (invalid status) end end trap :USR1 do $log.debug "fluentd supervisor process get SIGUSR1" @log.reopen! if pid = @main_pid Process.kill(:USR1, pid) # don't resuce Erro::ESRSH here (invalid status) end end end # with_log is for disabling logging before Log#init is called def read_config(with_log = true) $log.info "reading config file", :path => @config_path if with_log @config_fname = File.basename(@config_path) @config_basedir = File.dirname(@config_path) @config_data = File.read(@config_path) if @inline_config == '-' @config_data << "\n" << STDIN.read elsif @inline_config @config_data << "\n" << @inline_config.gsub("\\n","\n") end end class SystemConfig include Configurable config_param :log_level, :default => nil do |level| Log.str_to_level(level) end config_param :suppress_repeated_stacktrace, :bool, :default => nil config_param :emit_error_log_interval, :time, :default => nil config_param :suppress_config_dump, :bool, :default => nil def initialize(conf) super() configure(conf) end def to_opt opt = {} opt[:log_level] = @log_level unless @log_level.nil? opt[:suppress_interval] = @emit_error_log_interval unless @emit_error_log_interval.nil? opt[:suppress_config_dump] = @suppress_config_dump unless @suppress_config_dump.nil? opt[:suppress_repeated_stacktrace] = @suppress_repeated_stacktrace unless @suppress_repeated_stacktrace.nil? opt end end def apply_system_config(opt) read_config(false) systems = Fluent::Config.parse(@config_data, @config_fname, @config_basedir, @use_v1_config).elements.select { |e| e.name == 'system' } return if systems.empty? raise ConfigError, " is duplicated. should be only one" if systems.size > 1 opt.merge!(SystemConfig.new(systems.first).to_opt) end def run_configure conf = Fluent::Config.parse(@config_data, @config_fname, @config_basedir, @use_v1_config) Fluent::Engine.run_configure(conf) end def change_privilege if @chgroup etc_group = Supervisor.get_etc_group(@chgroup) Process::GID.change_privilege(etc_group.gid) end if @chuser etc_pw = Supervisor.get_etc_passwd(@chuser) user_groups = [etc_pw.gid] Etc.setgrent Etc.group { |gr| user_groups << gr.gid if gr.mem.include?(etc_pw.name) } # emulate 'id -G' Process.groups = Process.groups | user_groups Process::UID.change_privilege(etc_pw.uid) end end def init_engine require 'fluent/load' Fluent::Engine.init if @suppress_interval Fluent::Engine.suppress_interval(@suppress_interval) end Fluent::Engine.suppress_config_dump = @suppress_config_dump @libs.each {|lib| require lib } @plugin_dirs.each {|dir| if Dir.exist?(dir) dir = File.expand_path(dir) Fluent::Engine.load_plugin_dir(dir) end } end def install_main_process_signal_handlers # Strictly speaking, these signal handling is not thread safe. # But enough safe to limit twice call of Fluent::Engine.stop. trap :INT do $log.debug "fluentd main process get SIGINT" unless @finished @finished = true $log.debug "getting start to shutdown main process" Fluent::Engine.stop end end trap :TERM do $log.debug "fluentd main process get SIGTERM" unless @finished @finished = true $log.debug "getting start to shutdown main process" Fluent::Engine.stop end end trap :HUP do # TODO $log.debug "fluentd main process get SIGHUP" end trap :USR1 do $log.debug "fluentd main process get SIGUSR1" $log.info "force flushing buffered events" @log.reopen! # Creating new thread due to mutex can't lock # in main thread during trap context Thread.new { begin Fluent::Engine.flush! $log.debug "flushing thread: flushed" rescue Exception => e $log.warn "flushing thread error: #{e}" end }.run end end def run_engine Fluent::Engine.run end end end