lib/datasets/downloader.rb in red-datasets-0.1.5 vs lib/datasets/downloader.rb in red-datasets-0.1.6

- old
+ new

@@ -20,54 +20,119 @@ unless @url.is_a?(URI::HTTP) raise ArgumentError, "download URL must be HTTP or HTTPS: <#{@url}>" end end - def download(output_path) - return if output_path.exist? + def download(output_path, &block) + if output_path.exist? + yield_chunks(output_path, &block) if block_given? + return + end - output_path.parent.mkpath - - headers = { - "Accept-Encoding" => "identity", - "User-Agent" => "Red Datasets/#{VERSION}", - } - start = nil partial_output_path = Pathname.new("#{output_path}.partial") - if partial_output_path.exist? - start = partial_output_path.size - headers["Range"] = "bytes=#{start}-" - end + synchronize(output_path, partial_output_path) do + output_path.parent.mkpath - start_http(@url, headers) do |response| - if response.is_a?(Net::HTTPPartialContent) - mode = "ab" - else + n_retries = 0 + n_max_retries = 5 + begin + headers = { + "Accept-Encoding" => "identity", + "User-Agent" => "Red Datasets/#{VERSION}", + } start = nil - mode = "wb" - end + if partial_output_path.exist? + start = partial_output_path.size + headers["Range"] = "bytes=#{start}-" + end - base_name = @url.path.split("/").last - size_current = 0 - size_max = response.content_length - if start - size_current += start - size_max += start + start_http(@url, headers) do |response| + if response.is_a?(Net::HTTPPartialContent) + mode = "ab" + else + start = nil + mode = "wb" + end + + base_name = @url.path.split("/").last + size_current = 0 + size_max = response.content_length + if start + size_current += start + size_max += start + if block_given? and n_retries.zero? + yield_chunks(partial_output_path, &block) + end + end + progress_reporter = ProgressReporter.new(base_name, size_max) + partial_output_path.open(mode) do |output| + response.read_body do |chunk| + size_current += chunk.bytesize + progress_reporter.report(size_current) + output.write(chunk) + yield(chunk) if block_given? + end + end + end + FileUtils.mv(partial_output_path, output_path) + rescue Net::ReadTimeout => error + n_retries += 1 + retry if n_retries < n_max_retries + raise + rescue TooManyRedirects => error + last_url = error.message[/\Atoo many redirections: (.+)\z/, 1] + raise TooManyRedirects, "too many redirections: #{@url} .. #{last_url}" end - progress_reporter = ProgressReporter.new(base_name, size_max) - partial_output_path.open(mode) do |output| - response.read_body do |chunk| - size_current += chunk.bytesize - progress_reporter.report(size_current) - output.write(chunk) + end + end + + private def synchronize(output_path, partial_output_path) + begin + Process.getpgid(Process.pid) + rescue NotImplementedError + return yield + end + + lock_path = Pathname("#{output_path}.lock") + loop do + lock_path.parent.mkpath + begin + lock = lock_path.open(File::RDWR | File::CREAT | File::EXCL) + rescue SystemCallError + valid_lock_path = true + begin + pid = Integer(lock_path.read.chomp, 10) + rescue ArgumentError + # The process that acquired the lock will be exited before + # it stores its process ID. + valid_lock_path = (lock_path.mtime > 10) + else + begin + Process.getpgid(pid) + rescue SystemCallError + # Process that acquired the lock doesn't exist + valid_lock_path = false + end end + if valid_lock_path + sleep(1 + rand(10)) + else + lock_path.delete + end + retry + else + begin + lock.puts(Process.pid.to_s) + lock.flush + yield + ensure + lock.close + lock_path.delete + end + break end end - FileUtils.mv(partial_output_path, output_path) - rescue TooManyRedirects => error - last_url = error.message[/\Atoo many redirections: (.+)\z/, 1] - raise TooManyRedirects, "too many redirections: #{@url} .. #{last_url}" end private def start_http(url, headers, limit = 10, &block) if limit == 0 raise TooManyRedirects, "too many redirections: #{url}" @@ -93,9 +158,19 @@ message += ": #{response.message}" end message += ": #{url}" raise response.error_type.new(message, response) end + end + end + end + + private def yield_chunks(path) + path.open("rb") do |output| + chunk_size = 1024 * 1024 + chunk = "" + while output.read(chunk_size, chunk) + yield(chunk) end end end class ProgressReporter