lib/filbunke/client.rb in filbunke-1.12.0 vs lib/filbunke/client.rb in filbunke-1.13.0

- old
+ new

@@ -23,22 +23,21 @@ end def with_updated_files(last_checkpoint) updates = get_updated_file_list(last_checkpoint) updated_files = updates["files"] || [] - failure = false - - new_checkpoint = updates["checkpoint"] - - @logger.info "Updating repository: #{@repository.name}: #{updated_files.size} files. Checkpoint: #{last_checkpoint} ==> #{new_checkpoint}" if updated_files.size > 0 + new_checkpoint = updates["checkpoint"] || 0 + if updated_files.empty? + return new_checkpoint + end + @logger.info "Updating repository: #{@repository.name}: #{updated_files.size} files. Checkpoint: #{last_checkpoint} ==> #{new_checkpoint}" @async_requests = [] - callbacks_on_update = [] callbacks_on_no_change = [] callbacks_on_delete = [] - + has_update_file_failure = false updated_files.each do |raw_file| file = File.new(raw_file) local_file_path = ::File.join(@repository.local_path, file.path) if file.state == "DELETED" then delete_file!(local_file_path) @@ -48,47 +47,60 @@ if update_file!(file, local_file_path) then yield file callbacks_on_update << OpenStruct.new({ :file => file, :local_file_path => local_file_path }) else @logger.error "Unable to get file #{file.url} ==> #{file.path}!" - failure = true + has_update_file_failure = true + break end else @logger.debug "File exists with correct hash: #{local_file_path}" callbacks_on_no_change << OpenStruct.new({:file => file, :local_file_path => local_file_path}) end end end - @hydra.run - pfailure = failure || @async_requests.any? do |request| - @logger.warn "request did not handle response: #{request.inspect}" if request.response.nil? || request.response.code != 200 - request.response.nil? || request.response.code != 200 + if has_update_file_failure + @logger.error "FAILED to fetch files for #{@repository.name} last_checkpoint = #{last_checkpoint}" + return last_checkpoint end + @logger.info "Done setting up async requests for #{@repository.name}, starting fetch..." + has_fetch_failures = begin + @hydra.run + # Magnus 20160305 - since we now fail fast by raising a RuntimeError on response.code != 200 + # I think we can remove the following request validation + @async_requests.any? do |request| + @logger.warn "request did not handle response: #{request.inspect}" if request.response.nil? || request.response.code != 200 + request.response.nil? || request.response.code != 200 + end + rescue RuntimeError, SystemCallError, StandardError => e + msg = ["#{e.class} - #{e.message}", *e.backtrace].join("\n\t") + @logger.error "FAILED to fetch files for #{@repository.name} last_checkpoint = #{last_checkpoint}; #{msg}" + true + end - if pfailure == false - @logger.info "Done fetching files for #{@repository.name}, processing callbacks..." - begin - - run_callbacks_delete(callbacks_on_delete) - - run_callbacks(callbacks_on_update) - - run_callbacks_no_change(callbacks_on_no_change) + if has_fetch_failures + @logger.error "FAILED to fetch files for #{@repository.name} last_checkpoint = #{last_checkpoint}" + return last_checkpoint + end - new_checkpoint || last_checkpoint - rescue RuntimeError, SystemCallError, StandardError => e - msg = ["Callbacks failed to run; #{e.class} - #{e.message}", *e.backtrace].join("\n\t") - @logger.error "FAILED to update files for #{@repository.name} last_checkpoint = #{last_checkpoint}; #{msg}" - last_checkpoint - end - else - @logger.error "FAILED to update files for #{@repository.name} last_checkpoint = #{last_checkpoint}" + @logger.info "Done fetching files for #{@repository.name}, processing callbacks..." + new_or_last_checkpoint = begin + run_callbacks_delete(callbacks_on_delete) + run_callbacks(callbacks_on_update) + run_callbacks_no_change(callbacks_on_no_change) + + new_checkpoint || last_checkpoint + rescue RuntimeError, SystemCallError, StandardError => e + msg = ["Callbacks failed to run; #{e.class} - #{e.message}", *e.backtrace].join("\n\t") + @logger.error "FAILED to process callbacks for #{@repository.name} last_checkpoint = #{last_checkpoint}; #{msg}" last_checkpoint end + + new_or_last_checkpoint end - + def update_files!(last_checkpoint) with_updated_files(last_checkpoint) {} end def register_updated_file!(path, url, hash = nil) @@ -148,11 +160,11 @@ raise "Failed to get last checkpoint for repository: #{@repository.name}" end return response.body.chomp.to_i end end - + private def log_failed_request(failed_request_command, e) return unless @failed_request_log_file_name ::File.open(@failed_request_log_file_name, 'a+') do |f| @@ -160,15 +172,15 @@ f.puts(failed_request_command) end end def update_file!(file, local_file_path) - + if file.url =~ /^http:\/\// update_http_file!(file, local_file_path) elsif (file.url =~ /^hdfs:\/\//) - success = update_hdfs_file!(file, local_file_path) + update_hdfs_file!(file, local_file_path) else raise "Unsupported protocol for file: #{file.inspect}" end end @@ -203,11 +215,11 @@ begin updates_http = Net::HTTP.new(@repository.host, @repository.port) updates_http.read_timeout = 300 # default is 60 seconds updates_http.start do |http| updates_path = "/#{UPDATES_ACTION}/#{@repository.name}?#{FROM_CHECKPOINT_KEY}=#{last_checkpoint}" - updates_path = "#{updates_path}&batch_size=#{@repository.batch_size}" if @repository.batch_size + updates_path = "#{updates_path}&batch_size=#{@repository.batch_size}" if @repository.batch_size > 0 begin @logger.info "Fetching updated file list from #{updates_path}" request = Net::HTTP::Get.new(updates_path) response = http.request(request) if response.code.to_i == 200 @@ -222,69 +234,83 @@ end end rescue StandardError => e @logger.error "Unable to create HTTP connection to #{@repository.host}:#{@repository.port} (#{e.message})!" return {} - end + end end def update_http_file!(file, local_file_path) begin async_request = if @repository.user - Typhoeus::Request.new(URI.encode(file.url), :follow_location => true, :username => @repository.user, :password => @repository.pass) + Typhoeus::Request.new( + URI.escape(file.url), + :followlocation => true, + :username => @repository.user, + :password => @repository.pass + ) else - Typhoeus::Request.new(URI.encode(file.url), :follow_location => true) + Typhoeus::Request.new( + URI.escape(file.url), + :followlocation => true + ) end + + downloaded_file = nil + async_request.on_headers do |response| + if response.code != 200 + raise "Failed to fetch response(#{response.code}) for url '#{response.effective_url}' ---\n\t #{response.inspect}" + end + @logger.debug("Updating: #{local_file_path}") + ::FileUtils.mkdir_p(::File.dirname(local_file_path)) + downloaded_file = ::File.new("#{local_file_path}.tmp", "wb") + end + + async_request.on_body do |chunk, response| + downloaded_file.write(chunk) if response.code == 200 + end + async_request.on_complete do |response| - success = false - begin - success = response.code.to_i == 200 - if success - write_file!(local_file_path, response.body) - else - body_if_error = response.code >= 500 ? ", body = #{response.body}" : "" - @logger.warn "Failed to update file #{file.url}, got status code = #{response.code}#{body_if_error}" - end - rescue SystemCallError, StandardError => e - msg = ["#{e.class} - #{e.message}", *e.backtrace].join("\n\t") - @logger.error "Failed to update file #{file.url}: #{msg}" + unless downloaded_file.nil? + downloaded_file.close + ::FileUtils.mv("#{local_file_path}.tmp", local_file_path) + else + ::FileUtils.rm("#{local_file_path}.tmp") if ::File.exist?("#{local_file_path}.tmp") end - # return the async_request.handled_response value here - success + true end @hydra.queue async_request @async_requests << async_request - rescue StandardError => e + true + rescue RuntimeError, SystemCallError, StandardError => e msg = ["#{e.class} - #{e.message}", *e.backtrace].join("\n\t") @logger.error "Failed to update file #{file.url}: #{msg}" - return false + false end - - return true end - + def update_hdfs_file!(file, local_file_path) begin ::FileUtils.mkdir_p(::File.dirname(local_file_path)) ::FileUtils.rm_f("#{local_file_path}.tmp") url = file.url url.gsub!(/hdfs:\/\/([^\/]*)(.*)/, "hdfs://\\2") hdfs_cmd = "#{@repository.hadoop_binary} dfs -copyToLocal #{url} #{local_file_path}.tmp" #@logger.debug "Trying to update #{local_file_path} with '#{hdfs_cmd}'" - + pid, stdin, stdout, stderr = Open4::popen4 hdfs_cmd ignored, status = Process::waitpid2 pid - + if status.exitstatus == 0 then begin ::FileUtils.mv "#{local_file_path}.tmp", local_file_path return true rescue StandardError => e msg = ["#{e.class} - #{e.message}", *e.backtrace].join("\n\t") @logger.error "Failed to move hdfs file #{file.url}: #{msg}" return false - end + end else @logger.error "Failed to update hdfs file #{file.url}! Unable to execute #{hdfs_cmd}" return false end rescue SystemCallError, StandardError => e @@ -293,12 +319,11 @@ return false end end def write_file!(file_path, contents) - ::FileUtils.mkdir_p(::File.dirname(file_path)) - @logger.debug("Updating: #{file_path}") + begin ::File.open("#{file_path}.tmp", 'w') do |file| file.write(contents) file.close end @@ -306,10 +331,10 @@ return true rescue StandardError => e msg = ["#{e.class} - #{e.message}", *e.backtrace].join("\n\t") @logger.error "Failed to move file #{file_path}: #{msg}" return false - end + end end def delete_file!(file_path) if ::File.exists?(file_path) then @logger.debug("Deleting: #{file_path}")