lib/filbunke/client.rb in filbunke-2.0.9 vs lib/filbunke/client.rb in filbunke-2.1.0

- old
+ new

@@ -8,11 +8,10 @@ LAST_CHECKPOINT_ACTION = 'last_checkpoint' TOUCH_ACTION = 'touch' URL_KEY = 'url' FROM_CHECKPOINT_KEY = 'from_checkpoint' HASH_KEY = 'hash' - URI_UNSAFE_CHARACTERS = '/[^.:\/\w-]/' def initialize(repository, logger, callbacks = [], failed_request_log_file_name = nil) @repository = repository @logger = logger @@ -24,22 +23,21 @@ end def with_updated_files(last_checkpoint) updates = get_updated_file_list(last_checkpoint) updated_files = updates["files"] || [] - failure = false - - new_checkpoint = updates["checkpoint"] - - @logger.info "Updating repository: #{@repository.name}: #{updated_files.size} files. Checkpoint: #{last_checkpoint} ==> #{new_checkpoint}" if updated_files.size > 0 + new_checkpoint = updates["checkpoint"] || 0 + if updated_files.empty? + return new_checkpoint + end + @logger.info "Updating repository: #{@repository.name}: #{updated_files.size} files. Checkpoint: #{last_checkpoint} ==> #{new_checkpoint}" @async_requests = [] - callbacks_on_update = [] callbacks_on_no_change = [] callbacks_on_delete = [] - + has_update_file_failure = false updated_files.each do |raw_file| file = File.new(raw_file) local_file_path = ::File.join(@repository.local_path, file.path) if file.state == "DELETED" then delete_file!(local_file_path) @@ -48,46 +46,61 @@ if file_needs_update?(file, local_file_path) if update_file!(file, local_file_path) then yield file callbacks_on_update << OpenStruct.new({ :file => file, :local_file_path => local_file_path }) else - @logger.error "Unable to get file #{file.url} ==> #{file.path}!" - failure = true + @logger.error "Unable to fetch file #{file.url} ==> #{file.path}!" + has_update_file_failure = true + break; end - + else @logger.debug "File exists with correct hash: #{local_file_path}" callbacks_on_no_change << OpenStruct.new({:file => file, :local_file_path => local_file_path}) end end end - @hydra.run - pfailure = failure || @async_requests.any? do |request| - @logger.warn "request did not handle response: #{request.inspect}" if request.response.nil? || request.response.code != 200 - request.response.nil? || request.response.code != 200 + if has_update_file_failure + @logger.error "FAILED to fetch files for #{@repository.name} last_checkpoint = #{last_checkpoint}" + return last_checkpoint end + @logger.info "Done setting up async requests for #{@repository.name}, starting fetch..." - if pfailure == false - @logger.info "Done fetching files for #{@repository.name}, processing callbacks..." - begin - run_callbacks_delete(callbacks_on_delete) - run_callbacks(callbacks_on_update) - run_callbacks_no_change(callbacks_on_no_change) - - new_checkpoint || last_checkpoint - rescue RuntimeError, SystemCallError, StandardError => e - msg = ["Callbacks failed to run; #{e.class} - #{e.message}", *e.backtrace].join("\n\t") - @logger.error "FAILED to update files for #{@repository.name} last_checkpoint = #{last_checkpoint}; #{msg}" - last_checkpoint + has_fetch_files_failure = begin + @hydra.run + @async_requests.any? do |request| + @logger.warn "request did not handle response: #{request.inspect}" if request.response.nil? || request.response.code != 200 + request.response.nil? || request.response.code != 200 end - else + rescue RuntimeError, SystemCallError, StandardError => e + msg = ["#{e.class} - #{e.message}", *e.backtrace].join("\n\t") + @logger.error "FAILED to fetch files for #{@repository.name} last_checkpoint = #{last_checkpoint}; #{msg}" + true + end + + if has_fetch_files_failure @logger.error "FAILED to update files for #{@repository.name} last_checkpoint = #{last_checkpoint}" + return last_checkpoint + end + + @logger.info "Done fetching files for #{@repository.name}, processing callbacks..." + new_or_last_checkpoint = begin + run_callbacks_delete(callbacks_on_delete) + run_callbacks(callbacks_on_update) + run_callbacks_no_change(callbacks_on_no_change) + + new_checkpoint || last_checkpoint + rescue RuntimeError, SystemCallError, StandardError => e + msg = ["#{e.class} - #{e.message}", *e.backtrace].join("\n\t") + @logger.error "FAILED to process callbacks for #{@repository.name} last_checkpoint = #{last_checkpoint}; #{msg}" last_checkpoint end + + new_or_last_checkpoint end - + def update_files!(last_checkpoint) with_updated_files(last_checkpoint) {} end def register_updated_file!(path, url, hash = nil) @@ -136,22 +149,22 @@ end end end def last_checkpoint - last_checkpoint_http = Net::HTTP.new(@repository.host, @repository.port) + last_checkpoint_http = Net::HTTP.new(@repository.host, @repository.port) last_checkpoint_http.start do |http| last_checkpoint_path = "/#{UPDATES_ACTION}/#{@repository.name}/#{LAST_CHECKPOINT_ACTION}" request = Net::HTTP::Get.new(last_checkpoint_path) response = http.request(request) if response.code.to_i != 200 raise "Failed to get last checkpoint for repository: #{@repository.name}" end return response.body.chomp.to_i end end - + private def log_failed_request(failed_request_command, e) return unless @failed_request_log_file_name ::File.open(@failed_request_log_file_name, 'a+') do |f| @@ -159,11 +172,11 @@ f.puts(failed_request_command) end end def update_file!(file, local_file_path) - + if file.url =~ /^http:\/\// update_http_file!(file, local_file_path) elsif (file.url =~ /^hdfs:\/\//) success = update_hdfs_file!(file, local_file_path) else @@ -202,10 +215,11 @@ begin updates_http = Net::HTTP.new(@repository.host, @repository.port) updates_http.read_timeout = 300 # default is 60 seconds updates_http.start do |http| updates_path = "/#{UPDATES_ACTION}/#{@repository.name}?#{FROM_CHECKPOINT_KEY}=#{last_checkpoint}" + updates_path = "#{updates_path}&batch_size=#{@repository.batch_size}" if @repository.batch_size > 0 begin @logger.info "Fetching updated file list from #{updates_path}" request = Net::HTTP::Get.new(updates_path) response = http.request(request) if response.code.to_i == 200 @@ -226,63 +240,84 @@ end def update_http_file!(file, local_file_path) begin async_request = if @repository.user - Typhoeus::Request.new(URI.encode(file.url, URI_UNSAFE_CHARACTERS), :followlocation => true, :username => @repository.user, :password => @repository.pass) + Typhoeus::Request.new( + URI.escape(file.url), + :followlocation => true, + :username => @repository.user, + :password => @repository.pass + ) else - Typhoeus::Request.new(URI.encode(file.url, URI_UNSAFE_CHARACTERS), :followlocation => true) + Typhoeus::Request.new( + URI.escape(file.url), + :followlocation => true + ) end + + downloaded_file = nil + async_request.on_headers do |response| + if response.code != 200 + raise "Downloading file #{response.effective_url} failed with status code #{response.code} --- #{response.inspect}" + end + ::FileUtils.mkdir_p(::File.dirname(local_file_path)) + downloaded_file = ::File.new("#{local_file_path}.tmp", "wb") + @logger.debug("Updating file #{local_file_path}") + end + + + async_request.on_body do |chunk, response| + downloaded_file.write(chunk) if response.code == 200 + end + async_request.on_complete do |response| - success = false - begin - success = response.code.to_i == 200 - if success - write_file!(local_file_path, response.body) + unless downloaded_file.nil? + downloaded_file.close unless downloaded_file.closed? + if response.code == 200 + ::FileUtils.mv "#{local_file_path}.tmp", local_file_path else - body_if_error = response.code >= 500 ? ", body = #{response.body}" : "" - @logger.warn "Failed to update file #{file.url}, got status code = #{response.code}#{body_if_error}" + ::FileUtils.rm "#{local_file_path}.tmp" if ::File.exist? "#{local_file_path}.tmp" end - rescue SystemCallError, StandardError => e - msg = ["#{e.class} - #{e.message}", *e.backtrace].join("\n\t") - @logger.error "Failed to update file #{file.url}: #{msg}" end - # return the async_request.handled_response value here - success + true end @hydra.queue async_request @async_requests << async_request - rescue StandardError => e + true + rescue RuntimeError, SystemCallError, StandardError => e msg = ["#{e.class} - #{e.message}", *e.backtrace].join("\n\t") @logger.error "Failed to update file #{file.url}: #{msg}" - return false + unless downloaded_file.nil? + downloaded_file.close unless downloaded_file.closed? + ::FileUtils.rm "#{local_file_path}.tmp" if ::File.exist? "#{local_file_path}.tmp" + end + false end - - return true end - + def update_hdfs_file!(file, local_file_path) begin ::FileUtils.mkdir_p(::File.dirname(local_file_path)) ::FileUtils.rm_f("#{local_file_path}.tmp") url = file.url url.gsub!(/hdfs:\/\/([^\/]*)(.*)/, "hdfs://\\2") hdfs_cmd = "#{@repository.hadoop_binary} dfs -copyToLocal #{url} #{local_file_path}.tmp" #@logger.debug "Trying to update #{local_file_path} with '#{hdfs_cmd}'" - + pid, stdin, stdout, stderr = Open4::popen4 hdfs_cmd ignored, status = Process::waitpid2 pid - + if status.exitstatus == 0 then begin ::FileUtils.mv "#{local_file_path}.tmp", local_file_path return true rescue StandardError => e msg = ["#{e.class} - #{e.message}", *e.backtrace].join("\n\t") @logger.error "Failed to move hdfs file #{file.url}: #{msg}" return false - end + end else @logger.error "Failed to update hdfs file #{file.url}! Unable to execute #{hdfs_cmd}" return false end rescue SystemCallError, StandardError => e @@ -290,32 +325,13 @@ @logger.error "Failed to update hdfs file #{file.url}: #{msg}" return false end end - def write_file!(file_path, contents) - ::FileUtils.mkdir_p(::File.dirname(file_path)) - @logger.debug("Updating: #{file_path}") - begin - ::File.open("#{file_path}.tmp", 'w') do |file| - file.write(contents) - file.close - end - ::FileUtils.mv "#{file_path}.tmp", file_path - return true - rescue StandardError => e - msg = ["#{e.class} - #{e.message}", *e.backtrace].join("\n\t") - @logger.error "Failed to move file #{file_path}: #{msg}" - return false - end - end - def delete_file!(file_path) if ::File.exists?(file_path) then @logger.debug("Deleting: #{file_path}") ::File.delete(file_path) end end - end end -