module Filbunke class Client attr_reader :repository UPDATES_ACTION = 'updates' FILES_ACTION = 'files' LAST_CHECKPOINT_ACTION = 'last_checkpoint' TOUCH_ACTION = 'touch' URL_KEY = 'url' FROM_CHECKPOINT_KEY = 'from_checkpoint' HASH_KEY = 'hash' def initialize(repository, logger, callbacks = [], failed_request_log_file_name = nil) @repository = repository @logger = logger @callbacks = callbacks @failed_request_log_file_name = failed_request_log_file_name @hydra = Typhoeus::Hydra.new(:max_concurrency => @repository.hydra_concurrency) @logger.info "initialized client for repository '#{@repository.name}'; #{@repository.inspect}" end def with_updated_files(last_checkpoint) updates = get_updated_file_list(last_checkpoint) updated_files = updates["files"] || [] failure = false new_checkpoint = updates["checkpoint"] @logger.info "Updating repository: #{@repository.name}: #{updated_files.size} files. Checkpoint: #{last_checkpoint} ==> #{new_checkpoint}" if updated_files.size > 0 @async_requests = [] callbacks_on_update = [] callbacks_on_no_change = [] callbacks_on_delete = [] updated_files.each do |raw_file| file = File.new(raw_file) local_file_path = ::File.join(@repository.local_path, file.path) if file.state == "DELETED" then delete_file!(local_file_path) callbacks_on_delete << OpenStruct.new({ :file => file, :local_file_path => local_file_path }) else if file_needs_update?(file, local_file_path) if update_file!(file, local_file_path) then yield file callbacks_on_update << OpenStruct.new({ :file => file, :local_file_path => local_file_path }) else @logger.error "Unable to get file #{file.url} ==> #{file.path}!" failure = true end else @logger.debug "File exists with correct hash: #{local_file_path}" callbacks_on_no_change << OpenStruct.new({:file => file, :local_file_path => local_file_path}) end end end @hydra.run pfailure = failure || @async_requests.any? do |request| @logger.warn "request did not handle response: #{request.inspect}" if request.response.nil? || request.response.code != 200 request.response.nil? || request.response.code != 200 end if pfailure == false @logger.info "Done fetching files for #{@repository.name}, processing callbacks..." begin run_callbacks_delete(callbacks_on_delete) run_callbacks(callbacks_on_update) run_callbacks_no_change(callbacks_on_no_change) new_checkpoint || last_checkpoint rescue RuntimeError, SystemCallError, StandardError => e msg = ["Callbacks failed to run; #{e.class} - #{e.message}", *e.backtrace].join("\n\t") @logger.error "FAILED to update files for #{@repository.name} last_checkpoint = #{last_checkpoint}; #{msg}" last_checkpoint end else @logger.error "FAILED to update files for #{@repository.name} last_checkpoint = #{last_checkpoint}" last_checkpoint end end def update_files!(last_checkpoint) with_updated_files(last_checkpoint) {} end def register_updated_file!(path, url, hash = nil) register_path = "/#{FILES_ACTION}/#{@repository.name}/#{path}?#{URL_KEY}=#{url}" register_path += "&#{HASH_KEY}=#{hash}" if hash begin register_http = Net::HTTP.new(@repository.host, @repository.port) register_http.start do |http| request = Net::HTTP::Put.new(register_path) response = http.request(request) if response.code.to_i != 204 raise "Failed to register updated file: #{path}" end end rescue Exception => e log_failed_request(%Q{curl -XPUT "http://#{@repository.host}:#{@repository.port}#{register_path}"}, e) raise e end end def register_deleted_file!(path) register_path = "/#{FILES_ACTION}/#{@repository.name}/#{path}" begin register_http = Net::HTTP.new(@repository.host, @repository.port) register_http.start do |http| request = Net::HTTP::Delete.new(register_path) response = http.request(request) if response.code.to_i != 204 raise "Failed to register deleted file: #{path}" end end rescue Exception => e log_failed_request(%Q{curl -XDELETE "http://#{@repository.host}:#{@repository.port}#{register_path}"}, e) raise e end end def touch_repository! touch_http = Net::HTTP.new(@repository.host, @repository.port) touch_http.start do |http| touch_path = "/#{UPDATES_ACTION}/#{@repository.name}/#{TOUCH_ACTION}" request = Net::HTTP::Put.new(touch_path) response = http.request(request) if response.code.to_i != 204 raise "Failed to touch repository: #{@repository.name}" end end end def last_checkpoint last_checkpoint_http = Net::HTTP.new(@repository.host, @repository.port) last_checkpoint_http.start do |http| last_checkpoint_path = "/#{UPDATES_ACTION}/#{@repository.name}/#{LAST_CHECKPOINT_ACTION}" request = Net::HTTP::Get.new(last_checkpoint_path) response = http.request(request) if response.code.to_i != 200 raise "Failed to get last checkpoint for repository: #{@repository.name}" end return response.body.chomp.to_i end end private def log_failed_request(failed_request_command, e) return unless @failed_request_log_file_name ::File.open(@failed_request_log_file_name, 'a+') do |f| f.puts("# #{Time.now}: #{e.message if e}") f.puts(failed_request_command) end end def update_file!(file, local_file_path) if file.url =~ /^http:\/\// update_http_file!(file, local_file_path) elsif (file.url =~ /^hdfs:\/\//) success = update_hdfs_file!(file, local_file_path) else raise "Unsupported protocol for file: #{file.inspect}" end end def run_callbacks(files) @callbacks.each do |callback| callback.on_update_batch(files) end end def run_callbacks_no_change(files) @callbacks.each do |callback| callback.on_no_change_batch(files) end end def run_callbacks_delete(files) @callbacks.each do |callback| callback.on_delete_batch(files) end end def file_needs_update?(file, local_file_path) return true if file.hash.nil? || file.hash == "" return true unless ::File.exists?(local_file_path) local_hash = Digest::MD5.hexdigest(::File.read(local_file_path)) local_hash != file.hash end def get_updated_file_list(last_checkpoint) begin updates_http = Net::HTTP.new(@repository.host, @repository.port) updates_http.read_timeout = 300 # default is 60 seconds updates_http.start do |http| updates_path = "/#{UPDATES_ACTION}/#{@repository.name}?#{FROM_CHECKPOINT_KEY}=#{last_checkpoint}" begin @logger.info "Fetching updated file list from #{updates_path}" request = Net::HTTP::Get.new(updates_path) response = http.request(request) if response.code.to_i == 200 JSON.parse(response.body) else @logger.error "Failed to download updates for #{@repository.name}, error code = #{response.code}" {} end rescue StandardError => e @logger.error "Error getting file list from http://#{@repository.host}:#{@repository.port}#{updates_path}: #{e.message}! Retrying later.." {} end end rescue StandardError => e @logger.error "Unable to create HTTP connection to #{@repository.host}:#{@repository.port} (#{e.message})!" return {} end end def update_http_file!(file, local_file_path) begin async_request = if @repository.user Typhoeus::Request.new(URI.encode(file.url), :followlocation => true, :username => @repository.user, :password => @repository.pass) else Typhoeus::Request.new(URI.encode(file.url), :followlocation => true) end async_request.on_complete do |response| success = false begin success = response.code.to_i == 200 if success write_file!(local_file_path, response.body) else body_if_error = response.code >= 500 ? ", body = #{response.body}" : "" @logger.warn "Failed to update file #{file.url}, got status code = #{response.code}#{body_if_error}" end rescue SystemCallError, StandardError => e msg = ["#{e.class} - #{e.message}", *e.backtrace].join("\n\t") @logger.error "Failed to update file #{file.url}: #{msg}" end # return the async_request.handled_response value here success end @hydra.queue async_request @async_requests << async_request rescue StandardError => e msg = ["#{e.class} - #{e.message}", *e.backtrace].join("\n\t") @logger.error "Failed to update file #{file.url}: #{msg}" return false end return true end def update_hdfs_file!(file, local_file_path) begin ::FileUtils.mkdir_p(::File.dirname(local_file_path)) ::FileUtils.rm_f("#{local_file_path}.tmp") url = file.url url.gsub!(/hdfs:\/\/([^\/]*)(.*)/, "hdfs://\\2") hdfs_cmd = "#{@repository.hadoop_binary} dfs -copyToLocal #{url} #{local_file_path}.tmp" #@logger.debug "Trying to update #{local_file_path} with '#{hdfs_cmd}'" pid, stdin, stdout, stderr = Open4::popen4 hdfs_cmd ignored, status = Process::waitpid2 pid if status.exitstatus == 0 then begin ::FileUtils.mv "#{local_file_path}.tmp", local_file_path return true rescue StandardError => e msg = ["#{e.class} - #{e.message}", *e.backtrace].join("\n\t") @logger.error "Failed to move hdfs file #{file.url}: #{msg}" return false end else @logger.error "Failed to update hdfs file #{file.url}! Unable to execute #{hdfs_cmd}" return false end rescue SystemCallError, StandardError => e msg = ["#{e.class} - #{e.message}", *e.backtrace].join("\n\t") @logger.error "Failed to update hdfs file #{file.url}: #{msg}" return false end end def write_file!(file_path, contents) ::FileUtils.mkdir_p(::File.dirname(file_path)) @logger.debug("Updating: #{file_path}") begin ::File.open("#{file_path}.tmp", 'w') do |file| file.write(contents) file.close end ::FileUtils.mv "#{file_path}.tmp", file_path return true rescue StandardError => e msg = ["#{e.class} - #{e.message}", *e.backtrace].join("\n\t") @logger.error "Failed to move file #{file_path}: #{msg}" return false end end def delete_file!(file_path) if ::File.exists?(file_path) then @logger.debug("Deleting: #{file_path}") ::File.delete(file_path) end end end end