lib/datasets/downloader.rb in red-datasets-0.1.5 vs lib/datasets/downloader.rb in red-datasets-0.1.6
- old
+ new
@@ -20,54 +20,119 @@
unless @url.is_a?(URI::HTTP)
raise ArgumentError, "download URL must be HTTP or HTTPS: <#{@url}>"
end
end
- def download(output_path)
- return if output_path.exist?
+ def download(output_path, &block)
+ if output_path.exist?
+ yield_chunks(output_path, &block) if block_given?
+ return
+ end
- output_path.parent.mkpath
-
- headers = {
- "Accept-Encoding" => "identity",
- "User-Agent" => "Red Datasets/#{VERSION}",
- }
- start = nil
partial_output_path = Pathname.new("#{output_path}.partial")
- if partial_output_path.exist?
- start = partial_output_path.size
- headers["Range"] = "bytes=#{start}-"
- end
+ synchronize(output_path, partial_output_path) do
+ output_path.parent.mkpath
- start_http(@url, headers) do |response|
- if response.is_a?(Net::HTTPPartialContent)
- mode = "ab"
- else
+ n_retries = 0
+ n_max_retries = 5
+ begin
+ headers = {
+ "Accept-Encoding" => "identity",
+ "User-Agent" => "Red Datasets/#{VERSION}",
+ }
start = nil
- mode = "wb"
- end
+ if partial_output_path.exist?
+ start = partial_output_path.size
+ headers["Range"] = "bytes=#{start}-"
+ end
- base_name = @url.path.split("/").last
- size_current = 0
- size_max = response.content_length
- if start
- size_current += start
- size_max += start
+ start_http(@url, headers) do |response|
+ if response.is_a?(Net::HTTPPartialContent)
+ mode = "ab"
+ else
+ start = nil
+ mode = "wb"
+ end
+
+ base_name = @url.path.split("/").last
+ size_current = 0
+ size_max = response.content_length
+ if start
+ size_current += start
+ size_max += start
+ if block_given? and n_retries.zero?
+ yield_chunks(partial_output_path, &block)
+ end
+ end
+ progress_reporter = ProgressReporter.new(base_name, size_max)
+ partial_output_path.open(mode) do |output|
+ response.read_body do |chunk|
+ size_current += chunk.bytesize
+ progress_reporter.report(size_current)
+ output.write(chunk)
+ yield(chunk) if block_given?
+ end
+ end
+ end
+ FileUtils.mv(partial_output_path, output_path)
+ rescue Net::ReadTimeout => error
+ n_retries += 1
+ retry if n_retries < n_max_retries
+ raise
+ rescue TooManyRedirects => error
+ last_url = error.message[/\Atoo many redirections: (.+)\z/, 1]
+ raise TooManyRedirects, "too many redirections: #{@url} .. #{last_url}"
end
- progress_reporter = ProgressReporter.new(base_name, size_max)
- partial_output_path.open(mode) do |output|
- response.read_body do |chunk|
- size_current += chunk.bytesize
- progress_reporter.report(size_current)
- output.write(chunk)
+ end
+ end
+
+ private def synchronize(output_path, partial_output_path)
+ begin
+ Process.getpgid(Process.pid)
+ rescue NotImplementedError
+ return yield
+ end
+
+ lock_path = Pathname("#{output_path}.lock")
+ loop do
+ lock_path.parent.mkpath
+ begin
+ lock = lock_path.open(File::RDWR | File::CREAT | File::EXCL)
+ rescue SystemCallError
+ valid_lock_path = true
+ begin
+ pid = Integer(lock_path.read.chomp, 10)
+ rescue ArgumentError
+ # The process that acquired the lock will be exited before
+ # it stores its process ID.
+ valid_lock_path = (lock_path.mtime > 10)
+ else
+ begin
+ Process.getpgid(pid)
+ rescue SystemCallError
+ # Process that acquired the lock doesn't exist
+ valid_lock_path = false
+ end
end
+ if valid_lock_path
+ sleep(1 + rand(10))
+ else
+ lock_path.delete
+ end
+ retry
+ else
+ begin
+ lock.puts(Process.pid.to_s)
+ lock.flush
+ yield
+ ensure
+ lock.close
+ lock_path.delete
+ end
+ break
end
end
- FileUtils.mv(partial_output_path, output_path)
- rescue TooManyRedirects => error
- last_url = error.message[/\Atoo many redirections: (.+)\z/, 1]
- raise TooManyRedirects, "too many redirections: #{@url} .. #{last_url}"
end
private def start_http(url, headers, limit = 10, &block)
if limit == 0
raise TooManyRedirects, "too many redirections: #{url}"
@@ -93,9 +158,19 @@
message += ": #{response.message}"
end
message += ": #{url}"
raise response.error_type.new(message, response)
end
+ end
+ end
+ end
+
+ private def yield_chunks(path)
+ path.open("rb") do |output|
+ chunk_size = 1024 * 1024
+ chunk = ""
+ while output.read(chunk_size, chunk)
+ yield(chunk)
end
end
end
class ProgressReporter