module Filbunke class Client attr_reader :repository UPDATES_ACTION = 'updates' FILES_ACTION = 'files' LAST_CHECKPOINT_ACTION = 'last_checkpoint' TOUCH_ACTION = 'touch' URL_KEY = 'url' FROM_CHECKPOINT_KEY = 'from_checkpoint' HASH_KEY = 'hash' def initialize(repository, logger, callbacks = [], failed_request_log_file_name = nil) @repository = repository @logger = logger @callbacks = callbacks @failed_request_log_file_name = failed_request_log_file_name end def with_updated_files(last_checkpoint) updates = get_updated_file_list(last_checkpoint) updated_files = updates["files"] || [] failure = false new_checkpoint = updates["checkpoint"] @logger.log "Updating repository: #{repository.name}: #{updated_files.size} files. Checkpoint: #{last_checkpoint} ==> #{new_checkpoint}" if updated_files.size > 0 @hydra = Typhoeus::Hydra.new(:max_concurrency => 10) @async_requests = [] updated_files.each do |raw_file| file = File.new(raw_file) local_file_path = ::File.join(repository.local_path, file.path) if file.state == "DELETED" then delete_file!(local_file_path) run_callbacks_delete(file, local_file_path) else if file_needs_update?(file, local_file_path) if update_file!(file, local_file_path) then yield file else @logger.log "Unable to get file #{file.url} ==> #{file.path}!" failure = true end else @logger.log "File exists with correct hash: #{local_file_path}" run_callbacks_no_change(file, local_file_path) end end end @hydra.run failure = failure || @async_requests.any?{|request| request.handled_response == false } failure ? last_checkpoint : (new_checkpoint || last_checkpoint) end def update_files!(last_checkpoint) with_updated_files(last_checkpoint) {} end def register_updated_file!(path, url, hash = nil) register_path = "/#{FILES_ACTION}/#{@repository.name}/#{path}?#{URL_KEY}=#{url}" register_path += "&#{HASH_KEY}=#{hash}" if hash begin register_http = Net::HTTP.new(@repository.host, @repository.port) register_http.start do |http| request = Net::HTTP::Put.new(register_path) response = http.request(request) if response.code.to_i != 204 raise "Failed to register updated file: #{path}" end end rescue Exception => e log_failed_request(%Q{curl -XPUT "http://#{@repository.host}:#{@repository.port}#{register_path}"}, e) raise e end end def register_deleted_file!(path) register_path = "/#{FILES_ACTION}/#{@repository.name}/#{path}" begin register_http = Net::HTTP.new(@repository.host, @repository.port) register_http.start do |http| request = Net::HTTP::Delete.new(register_path) response = http.request(request) if response.code.to_i != 204 raise "Failed to register deleted file: #{path}" end end rescue Exception => e log_failed_request(%Q{curl -XDELETE "http://#{@repository.host}:#{@repository.port}#{register_path}"}, e) raise e end end def touch_repository! touch_http = Net::HTTP.new(@repository.host, @repository.port) touch_http.start do |http| touch_path = "/#{UPDATES_ACTION}/#{@repository.name}/#{TOUCH_ACTION}" request = Net::HTTP::Put.new(touch_path) response = http.request(request) if response.code.to_i != 204 raise "Failed to touch repository: #{@repository.name}" end end end def last_checkpoint last_checkpoint_http = Net::HTTP.new(@repository.host, @repository.port) last_checkpoint_http.start do |http| last_checkpoint_path = "/#{UPDATES_ACTION}/#{@repository.name}/#{LAST_CHECKPOINT_ACTION}" request = Net::HTTP::Get.new(last_checkpoint_path) response = http.request(request) if response.code.to_i != 200 raise "Failed to get last checkpoint for repository: #{@repository.name}" end return response.body.chomp.to_i end end private def log_failed_request(failed_request_command, e) return unless @failed_request_log_file_name ::File.open(@failed_request_log_file_name, 'a+') do |f| f.puts("# #{Time.now}: #{e.message if e}") f.puts(failed_request_command) end end def update_file!(file, local_file_path) if file.url =~ /^http:\/\// update_http_file!(file, local_file_path) elsif (file.url =~ /^hdfs:\/\//) || (file.url =~ /^hftp:\/\//) success = update_hdfs_file!(file, local_file_path) run_callbacks(file, local_file_path) if success else raise "Unsupported protocol for file: #{file.inspect}" end end def run_callbacks(file, local_file_path) @callbacks.each do |callback| callback.on_update(file, local_file_path) end end def run_callbacks_no_change(file, local_file_path) @callbacks.each do |callback| callback.on_no_change(file, local_file_path) end end def run_callbacks_delete(file, local_file_path) @callbacks.each do |callback| callback.on_delete(file, local_file_path) end end def file_needs_update?(file, local_file_path) return true if file.hash.nil? || file.hash == "" return true unless ::File.exists?(local_file_path) local_hash = Digest::MD5.hexdigest(::File.read(local_file_path)) local_hash != file.hash end def get_updated_file_list(last_checkpoint) begin updates_http = Net::HTTP.new(@repository.host, @repository.port) updates_http.start do |http| begin updates_path = "/#{UPDATES_ACTION}/#{@repository.name}?#{FROM_CHECKPOINT_KEY}=#{last_checkpoint}" request = Net::HTTP::Get.new(updates_path) response = http.request(request) if response.code.to_i == 200 JSON.parse(response.body) else @logger.log "Failed to download updates for #{@repository.name}, error code = #{response.code}" {} end rescue StandardError => e @logger.log "Error getting file list: #{e.message}! Retrying later.." {} end end rescue StandardError => e @logger.log "Unable to create HTTP connection to #{@repository.host}:#{@repository.port} (#{e.message})!" return {} end end def update_http_file!(file, local_file_path) begin async_request = if @repository.user Typhoeus::Request.new(file.url, :username => @repository.user, :password => @repository.pass) else Typhoeus::Request.new(file.url) end async_request.on_complete do |response| success = true begin if response.code.to_i == 200 write_file!(local_file_path, response.body) else @logger.log "Failed to update file #{file.url}, error code = #{response.code}" success = false end run_callbacks(file, local_file_path) if success rescue StandardError => e @logger.log "Failed to update file #{file.url}: #{e.message}" success = false end success end @hydra.queue async_request @async_requests << async_request rescue StandardError => e @logger.log "Failed to update file #{file.url}: #{e.message}" return false end return true end def update_hdfs_file!(file, local_file_path) begin ::FileUtils.mkdir_p(::File.dirname(local_file_path)) ::FileUtils.rm_f("#{local_file_path}.tmp") url = file.url if (url =~ /^hdfs:\/\//) url.gsub!(/hdfs:\/\//, "hftp://") url.gsub!(/:8020/, "") end hdfs_cmd = "#{@repository.hadoop_binary} fs -copyToLocal #{url} #{local_file_path}.tmp" @logger.log "Trying to update #{local_file_path} with '#{hdfs_cmd}'" pid, stdin, stdout, stderr = Open4::popen4 hdfs_cmd ignored, status = Process::waitpid2 pid if status.exitstatus == 0 then begin ::FileUtils.mv "#{local_file_path}.tmp", local_file_path return true rescue StandardError => e @logger.log "Failed to move hdfs file #{file.url}: #{e.message}" return false end else @logger.log "Failed to update hdfs file #{file.url}! Unable to execute #{hdfs_cmd}" return false end rescue StandardError => e @logger.log "Failed to update hdfs file #{file.url}: #{e.message}" return false end end def write_file!(file_path, contents) ::FileUtils.mkdir_p(::File.dirname(file_path)) @logger.log("Updating: #{file_path}") begin ::File.open("#{file_path}.tmp", 'w') do |file| file.write(contents); file.close end ::FileUtils.mv "#{file_path}.tmp", file_path return true rescue StandardError => e @logger.log "Failed to move file #{file.url}: #{e.message}" return false end end def delete_file!(file_path) if ::File.exists?(file_path) then @logger.log("Deleting: #{file_path}") ::File.delete(file_path) end end end end