require 'net/ssh/loggable' module Net; module SFTP; module Operations # A general purpose downloader module for Net::SFTP. It can download files # into IO objects, or directly to files on the local file system. It can # even download entire directory trees via SFTP, and provides a flexible # progress reporting mechanism. # # To download a single file from the remote server, simply specify both the # remote and local paths: # # downloader = sftp.download("/path/to/remote.txt", "/path/to/local.txt") # # By default, this operates asynchronously, so if you want to block until # the download finishes, you can use the 'bang' variant: # # sftp.download!("/path/to/remote.txt", "/path/to/local.txt") # # Or, if you have multiple downloads that you want to run in parallel, you can # employ the #wait method of the returned object: # # dls = %w(file1 file2 file3).map { |f| sftp.download("remote/#{f}", f) } # dls.each { |d| d.wait } # # To download an entire directory tree, recursively, simply specify :recursive => true: # # sftp.download!("/path/to/remotedir", "/path/to/local", :recursive => true) # # This will download "/path/to/remotedir", its contents, its subdirectories, # and their contents, recursively, to "/path/to/local" on the local host. # (If you specify :recursive => true and the source is not a directory, # you'll get an error!) # # If you want to pull the contents of a file on the remote server, and store # the data in memory rather than immediately to disk, you can pass an IO # object as the destination: # # require 'stringio' # io = StringIO.new # sftp.download!("/path/to/remote", io) # # This will only work for single-file downloads. Trying to do so with # :recursive => true will cause an error. # # The following options are supported: # # * :progress - either a block or an object to act as a progress # callback. See the discussion of "progress monitoring" below. # * :requests - the number of pending SFTP requests to allow at # any given time. When downloading an entire directory tree recursively, # this will default to 16. Setting this higher might improve throughput. # Reducing it will reduce throughput. # * :read_size - the maximum number of bytes to read at a time # from the source. Increasing this value might improve throughput. It # defaults to 32,000 bytes. # # == Progress Monitoring # # Sometimes it is desirable to track the progress of a download. There are # two ways to do this: either using a callback block, or a special custom # object. # # Using a block it's pretty straightforward: # # sftp.download!("remote", "local") do |event, downloader, *args| # case event # when :open then # # args[0] : file metadata # puts "starting download: #{args[0].remote} -> #{args[0].local} (#{args[0].size} bytes}" # when :get then # # args[0] : file metadata # # args[1] : byte offset in remote file # # args[2] : data that was received # puts "writing #{args[2].length} bytes to #{args[0].local} starting at #{args[1]}" # when :close then # # args[0] : file metadata # puts "finished with #{args[0].remote}" # when :mkdir then # # args[0] : local path name # puts "creating directory #{args[0]}" # when :finish then # puts "all done!" # end # end # # However, for more complex implementations (e.g., GUI interfaces and such) # a block can become cumbersome. In those cases, you can create custom # handler objects that respond to certain methods, and then pass your handler # to the downloader: # # class CustomHandler # def on_open(downloader, file) # puts "starting download: #{file.remote} -> #{file.local} (#{file.size} bytes)" # end # # def on_get(downloader, file, offset, data) # puts "writing #{data.length} bytes to #{file.local} starting at #{offset}" # end # # def on_close(downloader, file) # puts "finished with #{file.remote}" # end # # def on_mkdir(downloader, path) # puts "creating directory #{path}" # end # # def on_finish(downloader) # puts "all done!" # end # end # # sftp.download!("remote", "local", :progress => CustomHandler.new) # # If you omit any of those methods, the progress updates for those missing # events will be ignored. You can create a catchall method named "call" for # those, instead. class Download include Net::SSH::Loggable # The destination of the download (the name of a file or directory on # the local server, or an IO object) attr_reader :local # The source of the download (the name of a file or directory on the # remote server) attr_reader :remote # The hash of options that was given to this Download instance. attr_reader :options # The SFTP session instance that drives this download. attr_reader :sftp # The properties hash for this object attr_reader :properties # Instantiates a new downloader process on top of the given SFTP session. # +local+ is either an IO object that should receive the data, or a string # identifying the target file or directory on the local host. +remote+ is # a string identifying the location on the remote host that the download # should source. # # This will return immediately, and requires that the SSH event loop be # run in order to effect the download. (See #wait.) def initialize(sftp, local, remote, options={}, &progress) @sftp = sftp @local = local @remote = remote @progress = progress || options[:progress] @options = options @active = 0 @properties = options[:properties] || {} self.logger = sftp.logger if recursive? && local.respond_to?(:write) raise ArgumentError, "cannot download a directory tree in-memory" end @stack = [Entry.new(remote, local, recursive?)] process_next_entry end # Returns the value of the :recursive key in the options hash that was # given when the object was instantiated. def recursive? options[:recursive] end # Returns true if there are any active requests or pending files or # directories. def active? @active > 0 || stack.any? end # Forces the transfer to stop. def abort! @active = 0 @stack.clear end # Runs the SSH event loop for as long as the downloader is active (see # #active?). This can be used to block until the download completes. def wait sftp.loop { active? } self end # Returns the property with the given name. This allows Download instances # to store their own state when used as part of a state machine. def [](name) @properties[name.to_sym] end # Sets the given property to the given name. This allows Download instances # to store their own state when used as part of a state machine. def []=(name, value) @properties[name.to_sym] = value end private # A simple struct for encapsulating information about a single remote # file or directory that needs to be downloaded. Entry = Struct.new(:remote, :local, :directory, :size, :handle, :offset, :sink) #-- # "ruby -w" hates private attributes, so we have to do these longhand #++ # The stack of Entry instances, indicating which files and directories # on the remote host remain to be downloaded. def stack; @stack; end # The progress handler for this instance. Possibly nil. def progress; @progress; end # The default read size. DEFAULT_READ_SIZE = 32_000 # The number of bytes to read at a time from remote files. def read_size options[:read_size] || DEFAULT_READ_SIZE end # The number of simultaneou SFTP requests to use to effect the download. # Defaults to 16 for recursive downloads. def requests options[:requests] || (recursive? ? 16 : 2) end # Enqueues as many files and directories from the stack as possible # (see #requests). def process_next_entry while stack.any? && requests > @active entry = stack.shift @active += 1 if entry.directory update_progress(:mkdir, entry.local) ::Dir.mkdir(entry.local) unless ::File.directory?(entry.local) request = sftp.opendir(entry.remote, &method(:on_opendir)) request[:entry] = entry else open_file(entry) end end update_progress(:finish) if !active? end # Called when a remote directory is "opened" for reading, e.g. to # enumerate its contents. Starts an readdir operation if the opendir # operation was successful. def on_opendir(response) entry = response.request[:entry] raise StatusException.new(response, "opendir #{entry.remote}") unless response.ok? entry.handle = response[:handle] request = sftp.readdir(response[:handle], &method(:on_readdir)) request[:parent] = entry end # Called when the next batch of items is read from a directory on the # remote server. If any items were read, they are added to the queue # and #process_next_entry is called. def on_readdir(response) entry = response.request[:parent] if response.eof? request = sftp.close(entry.handle, &method(:on_closedir)) request[:parent] = entry elsif !response.ok? raise StatusException.new(response, "readdir #{entry.remote}") else response[:names].each do |item| next if item.name == "." || item.name == ".." stack << Entry.new(::File.join(entry.remote, item.name), ::File.join(entry.local, item.name), item.directory?, item.attributes.size) end # take this opportunity to enqueue more requests process_next_entry request = sftp.readdir(entry.handle, &method(:on_readdir)) request[:parent] = entry end end # Called when a file is to be opened for reading from the remote server. def open_file(entry) update_progress(:open, entry) request = sftp.open(entry.remote, &method(:on_open)) request[:entry] = entry end # Called when a directory handle is closed. def on_closedir(response) @active -= 1 entry = response.request[:parent] raise StatusException.new(response, "close #{entry.remote}") unless response.ok? process_next_entry end # Called when a file has been opened. This will call #download_next_chunk # to initiate the data transfer. def on_open(response) entry = response.request[:entry] raise StatusException.new(response, "open #{entry.remote}") unless response.ok? entry.handle = response[:handle] entry.sink = entry.local.respond_to?(:write) ? entry.local : ::File.open(entry.local, "wb") entry.offset = 0 download_next_chunk(entry) end # Initiates a read of the next #read_size bytes from the file. def download_next_chunk(entry) request = sftp.read(entry.handle, entry.offset, read_size, &method(:on_read)) request[:entry] = entry request[:offset] = entry.offset end # Called when a read from a file finishes. If the read was successful # and returned data, this will call #download_next_chunk to read the # next bit from the file. Otherwise the file will be closed. def on_read(response) entry = response.request[:entry] if response.eof? update_progress(:close, entry) entry.sink.close request = sftp.close(entry.handle, &method(:on_close)) request[:entry] = entry elsif !response.ok? raise StatusException.new(response, "read #{entry.remote}") else entry.offset += response[:data].bytesize update_progress(:get, entry, response.request[:offset], response[:data]) entry.sink.write(response[:data]) download_next_chunk(entry) end end # Called when a file handle is closed. def on_close(response) @active -= 1 entry = response.request[:entry] raise StatusException.new(response, "close #{entry.remote}") unless response.ok? process_next_entry end # If a progress callback or object has been set, this will report # the progress to that callback or object. def update_progress(hook, *args) on = "on_#{hook}" if progress.respond_to?(on) progress.send(on, self, *args) elsif progress.respond_to?(:call) progress.call(hook, self, *args) end end end end; end; end