lib/filbunke/client.rb in filbunke-2.0.9 vs lib/filbunke/client.rb in filbunke-2.1.0
- old
+ new
@@ -8,11 +8,10 @@
LAST_CHECKPOINT_ACTION = 'last_checkpoint'
TOUCH_ACTION = 'touch'
URL_KEY = 'url'
FROM_CHECKPOINT_KEY = 'from_checkpoint'
HASH_KEY = 'hash'
- URI_UNSAFE_CHARACTERS = '/[^.:\/\w-]/'
def initialize(repository, logger, callbacks = [], failed_request_log_file_name = nil)
@repository = repository
@logger = logger
@@ -24,22 +23,21 @@
end
def with_updated_files(last_checkpoint)
updates = get_updated_file_list(last_checkpoint)
updated_files = updates["files"] || []
- failure = false
-
- new_checkpoint = updates["checkpoint"]
-
- @logger.info "Updating repository: #{@repository.name}: #{updated_files.size} files. Checkpoint: #{last_checkpoint} ==> #{new_checkpoint}" if updated_files.size > 0
+ new_checkpoint = updates["checkpoint"] || 0
+ if updated_files.empty?
+ return new_checkpoint
+ end
+ @logger.info "Updating repository: #{@repository.name}: #{updated_files.size} files. Checkpoint: #{last_checkpoint} ==> #{new_checkpoint}"
@async_requests = []
-
callbacks_on_update = []
callbacks_on_no_change = []
callbacks_on_delete = []
-
+ has_update_file_failure = false
updated_files.each do |raw_file|
file = File.new(raw_file)
local_file_path = ::File.join(@repository.local_path, file.path)
if file.state == "DELETED" then
delete_file!(local_file_path)
@@ -48,46 +46,61 @@
if file_needs_update?(file, local_file_path)
if update_file!(file, local_file_path) then
yield file
callbacks_on_update << OpenStruct.new({ :file => file, :local_file_path => local_file_path })
else
- @logger.error "Unable to get file #{file.url} ==> #{file.path}!"
- failure = true
+ @logger.error "Unable to fetch file #{file.url} ==> #{file.path}!"
+ has_update_file_failure = true
+ break;
end
-
+
else
@logger.debug "File exists with correct hash: #{local_file_path}"
callbacks_on_no_change << OpenStruct.new({:file => file, :local_file_path => local_file_path})
end
end
end
- @hydra.run
- pfailure = failure || @async_requests.any? do |request|
- @logger.warn "request did not handle response: #{request.inspect}" if request.response.nil? || request.response.code != 200
- request.response.nil? || request.response.code != 200
+ if has_update_file_failure
+ @logger.error "FAILED to fetch files for #{@repository.name} last_checkpoint = #{last_checkpoint}"
+ return last_checkpoint
end
+ @logger.info "Done setting up async requests for #{@repository.name}, starting fetch..."
- if pfailure == false
- @logger.info "Done fetching files for #{@repository.name}, processing callbacks..."
- begin
- run_callbacks_delete(callbacks_on_delete)
- run_callbacks(callbacks_on_update)
- run_callbacks_no_change(callbacks_on_no_change)
-
- new_checkpoint || last_checkpoint
- rescue RuntimeError, SystemCallError, StandardError => e
- msg = ["Callbacks failed to run; #{e.class} - #{e.message}", *e.backtrace].join("\n\t")
- @logger.error "FAILED to update files for #{@repository.name} last_checkpoint = #{last_checkpoint}; #{msg}"
- last_checkpoint
+ has_fetch_files_failure = begin
+ @hydra.run
+ @async_requests.any? do |request|
+ @logger.warn "request did not handle response: #{request.inspect}" if request.response.nil? || request.response.code != 200
+ request.response.nil? || request.response.code != 200
end
- else
+ rescue RuntimeError, SystemCallError, StandardError => e
+ msg = ["#{e.class} - #{e.message}", *e.backtrace].join("\n\t")
+ @logger.error "FAILED to fetch files for #{@repository.name} last_checkpoint = #{last_checkpoint}; #{msg}"
+ true
+ end
+
+ if has_fetch_files_failure
@logger.error "FAILED to update files for #{@repository.name} last_checkpoint = #{last_checkpoint}"
+ return last_checkpoint
+ end
+
+ @logger.info "Done fetching files for #{@repository.name}, processing callbacks..."
+ new_or_last_checkpoint = begin
+ run_callbacks_delete(callbacks_on_delete)
+ run_callbacks(callbacks_on_update)
+ run_callbacks_no_change(callbacks_on_no_change)
+
+ new_checkpoint || last_checkpoint
+ rescue RuntimeError, SystemCallError, StandardError => e
+ msg = ["#{e.class} - #{e.message}", *e.backtrace].join("\n\t")
+ @logger.error "FAILED to process callbacks for #{@repository.name} last_checkpoint = #{last_checkpoint}; #{msg}"
last_checkpoint
end
+
+ new_or_last_checkpoint
end
-
+
def update_files!(last_checkpoint)
with_updated_files(last_checkpoint) {}
end
def register_updated_file!(path, url, hash = nil)
@@ -136,22 +149,22 @@
end
end
end
def last_checkpoint
- last_checkpoint_http = Net::HTTP.new(@repository.host, @repository.port)
+ last_checkpoint_http = Net::HTTP.new(@repository.host, @repository.port)
last_checkpoint_http.start do |http|
last_checkpoint_path = "/#{UPDATES_ACTION}/#{@repository.name}/#{LAST_CHECKPOINT_ACTION}"
request = Net::HTTP::Get.new(last_checkpoint_path)
response = http.request(request)
if response.code.to_i != 200
raise "Failed to get last checkpoint for repository: #{@repository.name}"
end
return response.body.chomp.to_i
end
end
-
+
private
def log_failed_request(failed_request_command, e)
return unless @failed_request_log_file_name
::File.open(@failed_request_log_file_name, 'a+') do |f|
@@ -159,11 +172,11 @@
f.puts(failed_request_command)
end
end
def update_file!(file, local_file_path)
-
+
if file.url =~ /^http:\/\//
update_http_file!(file, local_file_path)
elsif (file.url =~ /^hdfs:\/\//)
success = update_hdfs_file!(file, local_file_path)
else
@@ -202,10 +215,11 @@
begin
updates_http = Net::HTTP.new(@repository.host, @repository.port)
updates_http.read_timeout = 300 # default is 60 seconds
updates_http.start do |http|
updates_path = "/#{UPDATES_ACTION}/#{@repository.name}?#{FROM_CHECKPOINT_KEY}=#{last_checkpoint}"
+ updates_path = "#{updates_path}&batch_size=#{@repository.batch_size}" if @repository.batch_size > 0
begin
@logger.info "Fetching updated file list from #{updates_path}"
request = Net::HTTP::Get.new(updates_path)
response = http.request(request)
if response.code.to_i == 200
@@ -226,63 +240,84 @@
end
def update_http_file!(file, local_file_path)
begin
async_request = if @repository.user
- Typhoeus::Request.new(URI.encode(file.url, URI_UNSAFE_CHARACTERS), :followlocation => true, :username => @repository.user, :password => @repository.pass)
+ Typhoeus::Request.new(
+ URI.escape(file.url),
+ :followlocation => true,
+ :username => @repository.user,
+ :password => @repository.pass
+ )
else
- Typhoeus::Request.new(URI.encode(file.url, URI_UNSAFE_CHARACTERS), :followlocation => true)
+ Typhoeus::Request.new(
+ URI.escape(file.url),
+ :followlocation => true
+ )
end
+
+ downloaded_file = nil
+ async_request.on_headers do |response|
+ if response.code != 200
+ raise "Downloading file #{response.effective_url} failed with status code #{response.code} --- #{response.inspect}"
+ end
+ ::FileUtils.mkdir_p(::File.dirname(local_file_path))
+ downloaded_file = ::File.new("#{local_file_path}.tmp", "wb")
+ @logger.debug("Updating file #{local_file_path}")
+ end
+
+
+ async_request.on_body do |chunk, response|
+ downloaded_file.write(chunk) if response.code == 200
+ end
+
async_request.on_complete do |response|
- success = false
- begin
- success = response.code.to_i == 200
- if success
- write_file!(local_file_path, response.body)
+ unless downloaded_file.nil?
+ downloaded_file.close unless downloaded_file.closed?
+ if response.code == 200
+ ::FileUtils.mv "#{local_file_path}.tmp", local_file_path
else
- body_if_error = response.code >= 500 ? ", body = #{response.body}" : ""
- @logger.warn "Failed to update file #{file.url}, got status code = #{response.code}#{body_if_error}"
+ ::FileUtils.rm "#{local_file_path}.tmp" if ::File.exist? "#{local_file_path}.tmp"
end
- rescue SystemCallError, StandardError => e
- msg = ["#{e.class} - #{e.message}", *e.backtrace].join("\n\t")
- @logger.error "Failed to update file #{file.url}: #{msg}"
end
- # return the async_request.handled_response value here
- success
+ true
end
@hydra.queue async_request
@async_requests << async_request
- rescue StandardError => e
+ true
+ rescue RuntimeError, SystemCallError, StandardError => e
msg = ["#{e.class} - #{e.message}", *e.backtrace].join("\n\t")
@logger.error "Failed to update file #{file.url}: #{msg}"
- return false
+ unless downloaded_file.nil?
+ downloaded_file.close unless downloaded_file.closed?
+ ::FileUtils.rm "#{local_file_path}.tmp" if ::File.exist? "#{local_file_path}.tmp"
+ end
+ false
end
-
- return true
end
-
+
def update_hdfs_file!(file, local_file_path)
begin
::FileUtils.mkdir_p(::File.dirname(local_file_path))
::FileUtils.rm_f("#{local_file_path}.tmp")
url = file.url
url.gsub!(/hdfs:\/\/([^\/]*)(.*)/, "hdfs://\\2")
hdfs_cmd = "#{@repository.hadoop_binary} dfs -copyToLocal #{url} #{local_file_path}.tmp"
#@logger.debug "Trying to update #{local_file_path} with '#{hdfs_cmd}'"
-
+
pid, stdin, stdout, stderr = Open4::popen4 hdfs_cmd
ignored, status = Process::waitpid2 pid
-
+
if status.exitstatus == 0 then
begin
::FileUtils.mv "#{local_file_path}.tmp", local_file_path
return true
rescue StandardError => e
msg = ["#{e.class} - #{e.message}", *e.backtrace].join("\n\t")
@logger.error "Failed to move hdfs file #{file.url}: #{msg}"
return false
- end
+ end
else
@logger.error "Failed to update hdfs file #{file.url}! Unable to execute #{hdfs_cmd}"
return false
end
rescue SystemCallError, StandardError => e
@@ -290,32 +325,13 @@
@logger.error "Failed to update hdfs file #{file.url}: #{msg}"
return false
end
end
- def write_file!(file_path, contents)
- ::FileUtils.mkdir_p(::File.dirname(file_path))
- @logger.debug("Updating: #{file_path}")
- begin
- ::File.open("#{file_path}.tmp", 'w') do |file|
- file.write(contents)
- file.close
- end
- ::FileUtils.mv "#{file_path}.tmp", file_path
- return true
- rescue StandardError => e
- msg = ["#{e.class} - #{e.message}", *e.backtrace].join("\n\t")
- @logger.error "Failed to move file #{file_path}: #{msg}"
- return false
- end
- end
-
def delete_file!(file_path)
if ::File.exists?(file_path) then
@logger.debug("Deleting: #{file_path}")
::File.delete(file_path)
end
end
-
end
end
-