module Filbunke class Daemon def initialize(config, local = false) @config = config @clients = [] setup_clients!(local) write_pid!(config["pid_file"]) unless local end def setup_clients!(local = false) @logger = FilbunkeLogger.new(@config["log_file"], local, @config["log_level"]) @logger.log("Initializing filbunked") @config["repositories"].each do |repository_name, repository_config| @logger.log("Initializing repository: #{repository_name}") @clients << begin repository_config["run_every"] = repository_config.fetch("run_every", @config.fetch("run_every", 10)) repository_config["batch_size"] = repository_config.fetch("batch_size", @config.fetch("batch_size", 0)) repository = Repository.new(repository_config) callbacks = [] repository_config["callbacks"].each do |callback_name, callback_config| require ::File.join(@config["callback_path"], callback_name.to_s) callback_class = Module.const_get(callback_name.split("_").map(&:capitalize).join) callbacks << callback_class.new(@logger, callback_config) end failed_request_log_file_name = repository_config["failed_request_log_file_name"]||nil Client.new(repository, @logger, callbacks, failed_request_log_file_name) rescue => e msg = ["#{e.class} - #{e.message}", *e.backtrace].join("\n\t") @logger.error("Failed to initialize #{repository_name}; #{msg}") raise e end end end def run! version = ::File.read(::File.expand_path(::File.join(::File.dirname(__FILE__), "../../VERSION"))).chomp @logger.log("Starting filbunked version #{version}") client_pids = [] @parent_pid = Process.pid @clients.each do |client| client_pids << fork do begin while process_is_alive(@parent_pid) new_checkpoint = client.update_files!(checkpoint_for_repository(client.repository)) @logger.info "Update finished for #{client.repository.name} new checkpoint => #{new_checkpoint}" update_checkpoint_for_repository(client.repository, new_checkpoint) sleep client.repository.run_every end rescue => e msg = ["#{e.class} - #{e.message}", *e.backtrace].join("\n\t") @logger.error("#{client.repository.name} Died.. #{msg}") Process.kill("KILL", @parent_pid) exit 1 end end end client_pids.each { |pid| Process.wait(pid) } end def process_is_alive(pid) !!Process.kill(0, pid) rescue false end private def checkpoint_for_repository(repository) ::File.open(checkpoint_file_name_for_repository(repository), 'r') {|f| f.readline.to_i } rescue 0 end def update_checkpoint_for_repository(repository, checkpoint) f = ::File.new(checkpoint_file_name_for_repository(repository), 'w', repository.file_umask) begin f.write(checkpoint) ensure f.close end end def checkpoint_file_name_for_repository(repository) FileUtils.mkdir_p(@config["checkpoint_path"], :mode => repository.directory_umask) ::File.join(@config["checkpoint_path"], repository.name + ".checkpoint") end def write_pid!(pid_file_path) begin existing_process = ::File.read(pid_file_path).to_i if ::File.readable?(pid_file_path) if existing_process != nil and existing_process > 0 @logger.info("killing existing process #{existing_process} from #{pid_file_path}") Process.kill("KILL", existing_process) end rescue => e @logger.warn("failed to kill existing pid from #{pid_file_path}: #{e}\n\twill ignore and continue...") end ::File.open(pid_file_path, 'w') do |f| f.write(Process.pid.to_i) f.close end end end end