module Filbunke class Daemon def initialize(config, local = false) @config = config @clients = [] setup_clients!(local) write_pid!(config["pid_file"]) unless local end def setup_clients!(local = false) @logger = Logger.new(@config["log_file"], local) @logger.log("Initializing filbunked") @config["repositories"].each do |repository_name, repository_config| @logger.log("Initializing repository: #{repository_name}") @clients << begin repository = Repository.new(repository_config["filbunke_server_repository"], repository_config["filbunke_server_host"], repository_config["filbunke_server_port"], repository_config["local_path"], repository_config["file_umask"].to_i, repository_config["directory_umask"].to_i, repository_config["file_url_username"], repository_config["file_url_password"], repository_config["hadoop_binary"]) callbacks = [] repository_config["callbacks"].each do |callback_name, callback_config| require ::File.join(@config["callback_path"], callback_name.to_s) callback_class = Module.const_get(callback_name.split("_").map(&:capitalize).join) callbacks << callback_class.new(@logger, callback_config) end Client.new(repository, @logger, callbacks) end end end def run! version = ::File.read(::File.expand_path(::File.join(::File.dirname(__FILE__), "../../VERSION"))).chomp @logger.log("Starting filbunked version #{version}") while true begin @clients.each do |client| new_checkpoint = client.update_files!(checkpoint_for_repository(client.repository)) update_checkpoint_for_repository(client.repository, new_checkpoint) end rescue StandardError => e @logger.log("Died.. #{e.message}") exit 1 end sleep @config["run_every"] end end private def checkpoint_for_repository(repository) ::File.open(checkpoint_file_name_for_repository(repository), 'r') {|f| f.readline.to_i } rescue 0 end def update_checkpoint_for_repository(repository, checkpoint) f = ::File.new(checkpoint_file_name_for_repository(repository), 'w', repository.file_umask) f.write(checkpoint) f.close end def checkpoint_file_name_for_repository(repository) FileUtils.mkdir_p(@config["checkpoint_path"], :mode => repository.directory_umask) ::File.join(@config["checkpoint_path"], repository.name + ".checkpoint") end def write_pid!(pid_file_path) ::File.open(pid_file_path, 'w') do |f| f.write(Process.pid.to_i) f.close end end end end