lib/filbunke/client.rb in filbunke-1.12.0 vs lib/filbunke/client.rb in filbunke-1.13.0
- old
+ new
@@ -23,22 +23,21 @@
end
def with_updated_files(last_checkpoint)
updates = get_updated_file_list(last_checkpoint)
updated_files = updates["files"] || []
- failure = false
-
- new_checkpoint = updates["checkpoint"]
-
- @logger.info "Updating repository: #{@repository.name}: #{updated_files.size} files. Checkpoint: #{last_checkpoint} ==> #{new_checkpoint}" if updated_files.size > 0
+ new_checkpoint = updates["checkpoint"] || 0
+ if updated_files.empty?
+ return new_checkpoint
+ end
+ @logger.info "Updating repository: #{@repository.name}: #{updated_files.size} files. Checkpoint: #{last_checkpoint} ==> #{new_checkpoint}"
@async_requests = []
-
callbacks_on_update = []
callbacks_on_no_change = []
callbacks_on_delete = []
-
+ has_update_file_failure = false
updated_files.each do |raw_file|
file = File.new(raw_file)
local_file_path = ::File.join(@repository.local_path, file.path)
if file.state == "DELETED" then
delete_file!(local_file_path)
@@ -48,47 +47,60 @@
if update_file!(file, local_file_path) then
yield file
callbacks_on_update << OpenStruct.new({ :file => file, :local_file_path => local_file_path })
else
@logger.error "Unable to get file #{file.url} ==> #{file.path}!"
- failure = true
+ has_update_file_failure = true
+ break
end
else
@logger.debug "File exists with correct hash: #{local_file_path}"
callbacks_on_no_change << OpenStruct.new({:file => file, :local_file_path => local_file_path})
end
end
end
- @hydra.run
- pfailure = failure || @async_requests.any? do |request|
- @logger.warn "request did not handle response: #{request.inspect}" if request.response.nil? || request.response.code != 200
- request.response.nil? || request.response.code != 200
+ if has_update_file_failure
+ @logger.error "FAILED to fetch files for #{@repository.name} last_checkpoint = #{last_checkpoint}"
+ return last_checkpoint
end
+ @logger.info "Done setting up async requests for #{@repository.name}, starting fetch..."
+ has_fetch_failures = begin
+ @hydra.run
+ # Magnus 20160305 - since we now fail fast by raising a RuntimeError on response.code != 200
+ # I think we can remove the following request validation
+ @async_requests.any? do |request|
+ @logger.warn "request did not handle response: #{request.inspect}" if request.response.nil? || request.response.code != 200
+ request.response.nil? || request.response.code != 200
+ end
+ rescue RuntimeError, SystemCallError, StandardError => e
+ msg = ["#{e.class} - #{e.message}", *e.backtrace].join("\n\t")
+ @logger.error "FAILED to fetch files for #{@repository.name} last_checkpoint = #{last_checkpoint}; #{msg}"
+ true
+ end
- if pfailure == false
- @logger.info "Done fetching files for #{@repository.name}, processing callbacks..."
- begin
-
- run_callbacks_delete(callbacks_on_delete)
-
- run_callbacks(callbacks_on_update)
-
- run_callbacks_no_change(callbacks_on_no_change)
+ if has_fetch_failures
+ @logger.error "FAILED to fetch files for #{@repository.name} last_checkpoint = #{last_checkpoint}"
+ return last_checkpoint
+ end
- new_checkpoint || last_checkpoint
- rescue RuntimeError, SystemCallError, StandardError => e
- msg = ["Callbacks failed to run; #{e.class} - #{e.message}", *e.backtrace].join("\n\t")
- @logger.error "FAILED to update files for #{@repository.name} last_checkpoint = #{last_checkpoint}; #{msg}"
- last_checkpoint
- end
- else
- @logger.error "FAILED to update files for #{@repository.name} last_checkpoint = #{last_checkpoint}"
+ @logger.info "Done fetching files for #{@repository.name}, processing callbacks..."
+ new_or_last_checkpoint = begin
+ run_callbacks_delete(callbacks_on_delete)
+ run_callbacks(callbacks_on_update)
+ run_callbacks_no_change(callbacks_on_no_change)
+
+ new_checkpoint || last_checkpoint
+ rescue RuntimeError, SystemCallError, StandardError => e
+ msg = ["Callbacks failed to run; #{e.class} - #{e.message}", *e.backtrace].join("\n\t")
+ @logger.error "FAILED to process callbacks for #{@repository.name} last_checkpoint = #{last_checkpoint}; #{msg}"
last_checkpoint
end
+
+ new_or_last_checkpoint
end
-
+
def update_files!(last_checkpoint)
with_updated_files(last_checkpoint) {}
end
def register_updated_file!(path, url, hash = nil)
@@ -148,11 +160,11 @@
raise "Failed to get last checkpoint for repository: #{@repository.name}"
end
return response.body.chomp.to_i
end
end
-
+
private
def log_failed_request(failed_request_command, e)
return unless @failed_request_log_file_name
::File.open(@failed_request_log_file_name, 'a+') do |f|
@@ -160,15 +172,15 @@
f.puts(failed_request_command)
end
end
def update_file!(file, local_file_path)
-
+
if file.url =~ /^http:\/\//
update_http_file!(file, local_file_path)
elsif (file.url =~ /^hdfs:\/\//)
- success = update_hdfs_file!(file, local_file_path)
+ update_hdfs_file!(file, local_file_path)
else
raise "Unsupported protocol for file: #{file.inspect}"
end
end
@@ -203,11 +215,11 @@
begin
updates_http = Net::HTTP.new(@repository.host, @repository.port)
updates_http.read_timeout = 300 # default is 60 seconds
updates_http.start do |http|
updates_path = "/#{UPDATES_ACTION}/#{@repository.name}?#{FROM_CHECKPOINT_KEY}=#{last_checkpoint}"
- updates_path = "#{updates_path}&batch_size=#{@repository.batch_size}" if @repository.batch_size
+ updates_path = "#{updates_path}&batch_size=#{@repository.batch_size}" if @repository.batch_size > 0
begin
@logger.info "Fetching updated file list from #{updates_path}"
request = Net::HTTP::Get.new(updates_path)
response = http.request(request)
if response.code.to_i == 200
@@ -222,69 +234,83 @@
end
end
rescue StandardError => e
@logger.error "Unable to create HTTP connection to #{@repository.host}:#{@repository.port} (#{e.message})!"
return {}
- end
+ end
end
def update_http_file!(file, local_file_path)
begin
async_request = if @repository.user
- Typhoeus::Request.new(URI.encode(file.url), :follow_location => true, :username => @repository.user, :password => @repository.pass)
+ Typhoeus::Request.new(
+ URI.escape(file.url),
+ :followlocation => true,
+ :username => @repository.user,
+ :password => @repository.pass
+ )
else
- Typhoeus::Request.new(URI.encode(file.url), :follow_location => true)
+ Typhoeus::Request.new(
+ URI.escape(file.url),
+ :followlocation => true
+ )
end
+
+ downloaded_file = nil
+ async_request.on_headers do |response|
+ if response.code != 200
+ raise "Failed to fetch response(#{response.code}) for url '#{response.effective_url}' ---\n\t #{response.inspect}"
+ end
+ @logger.debug("Updating: #{local_file_path}")
+ ::FileUtils.mkdir_p(::File.dirname(local_file_path))
+ downloaded_file = ::File.new("#{local_file_path}.tmp", "wb")
+ end
+
+ async_request.on_body do |chunk, response|
+ downloaded_file.write(chunk) if response.code == 200
+ end
+
async_request.on_complete do |response|
- success = false
- begin
- success = response.code.to_i == 200
- if success
- write_file!(local_file_path, response.body)
- else
- body_if_error = response.code >= 500 ? ", body = #{response.body}" : ""
- @logger.warn "Failed to update file #{file.url}, got status code = #{response.code}#{body_if_error}"
- end
- rescue SystemCallError, StandardError => e
- msg = ["#{e.class} - #{e.message}", *e.backtrace].join("\n\t")
- @logger.error "Failed to update file #{file.url}: #{msg}"
+ unless downloaded_file.nil?
+ downloaded_file.close
+ ::FileUtils.mv("#{local_file_path}.tmp", local_file_path)
+ else
+ ::FileUtils.rm("#{local_file_path}.tmp") if ::File.exist?("#{local_file_path}.tmp")
end
- # return the async_request.handled_response value here
- success
+ true
end
@hydra.queue async_request
@async_requests << async_request
- rescue StandardError => e
+ true
+ rescue RuntimeError, SystemCallError, StandardError => e
msg = ["#{e.class} - #{e.message}", *e.backtrace].join("\n\t")
@logger.error "Failed to update file #{file.url}: #{msg}"
- return false
+ false
end
-
- return true
end
-
+
def update_hdfs_file!(file, local_file_path)
begin
::FileUtils.mkdir_p(::File.dirname(local_file_path))
::FileUtils.rm_f("#{local_file_path}.tmp")
url = file.url
url.gsub!(/hdfs:\/\/([^\/]*)(.*)/, "hdfs://\\2")
hdfs_cmd = "#{@repository.hadoop_binary} dfs -copyToLocal #{url} #{local_file_path}.tmp"
#@logger.debug "Trying to update #{local_file_path} with '#{hdfs_cmd}'"
-
+
pid, stdin, stdout, stderr = Open4::popen4 hdfs_cmd
ignored, status = Process::waitpid2 pid
-
+
if status.exitstatus == 0 then
begin
::FileUtils.mv "#{local_file_path}.tmp", local_file_path
return true
rescue StandardError => e
msg = ["#{e.class} - #{e.message}", *e.backtrace].join("\n\t")
@logger.error "Failed to move hdfs file #{file.url}: #{msg}"
return false
- end
+ end
else
@logger.error "Failed to update hdfs file #{file.url}! Unable to execute #{hdfs_cmd}"
return false
end
rescue SystemCallError, StandardError => e
@@ -293,12 +319,11 @@
return false
end
end
def write_file!(file_path, contents)
- ::FileUtils.mkdir_p(::File.dirname(file_path))
- @logger.debug("Updating: #{file_path}")
+
begin
::File.open("#{file_path}.tmp", 'w') do |file|
file.write(contents)
file.close
end
@@ -306,10 +331,10 @@
return true
rescue StandardError => e
msg = ["#{e.class} - #{e.message}", *e.backtrace].join("\n\t")
@logger.error "Failed to move file #{file_path}: #{msg}"
return false
- end
+ end
end
def delete_file!(file_path)
if ::File.exists?(file_path) then
@logger.debug("Deleting: #{file_path}")