require 'net/ssh/loggable' module Net; module SFTP; module Operations # A general purpose uploader module for Net::SFTP. It can upload IO objects, # files, and even entire directory trees via SFTP, and provides a flexible # progress reporting mechanism. # # To upload a single file to the remote server, simply specify both the # local and remote paths: # # uploader = sftp.upload("/path/to/local.txt", "/path/to/remote.txt") # # By default, this operates asynchronously, so if you want to block until # the upload finishes, you can use the 'bang' variant: # # sftp.upload!("/path/to/local.txt", "/path/to/remote.txt") # # Or, if you have multiple uploads that you want to run in parallel, you can # employ the #wait method of the returned object: # # uploads = %w(file1 file2 file3).map { |f| sftp.upload(f, "remote/#{f}") } # uploads.each { |u| u.wait } # # To upload an entire directory tree, recursively, simply pass the directory # path as the first parameter: # # sftp.upload!("/path/to/directory", "/path/to/remote") # # This will upload "/path/to/directory", its contents, its subdirectories, # and their contents, recursively, to "/path/to/remote" on the remote server. # # For uploading a directory without creating it, do # sftp.upload!("/path/to/directory", "/path/to/remote", :mkdir => false) # # If you want to send data to a file on the remote server, but the data is # in memory, you can pass an IO object and upload its contents: # # require 'stringio' # io = StringIO.new(data) # sftp.upload!(io, "/path/to/remote") # # The following options are supported: # # * :progress - either a block or an object to act as a progress # callback. See the discussion of "progress monitoring" below. # * :requests - the number of pending SFTP requests to allow at # any given time. When uploading an entire directory tree recursively, # this will default to 16, otherwise it will default to 2. Setting this # higher might improve throughput. Reducing it will reduce throughput. # * :read_size - the maximum number of bytes to read at a time # from the source. Increasing this value might improve throughput. It # defaults to 32,000 bytes. # * :name - the filename to report to the progress monitor when # an IO object is given as +local+. This defaults to "". # # == Progress Monitoring # # Sometimes it is desirable to track the progress of an upload. There are # two ways to do this: either using a callback block, or a special custom # object. # # Using a block it's pretty straightforward: # # sftp.upload!("local", "remote") do |event, uploader, *args| # case event # when :open then # # args[0] : file metadata # puts "starting upload: #{args[0].local} -> #{args[0].remote} (#{args[0].size} bytes}" # when :put then # # args[0] : file metadata # # args[1] : byte offset in remote file # # args[2] : data being written (as string) # puts "writing #{args[2].length} bytes to #{args[0].remote} starting at #{args[1]}" # when :close then # # args[0] : file metadata # puts "finished with #{args[0].remote}" # when :mkdir then # # args[0] : remote path name # puts "creating directory #{args[0]}" # when :finish then # puts "all done!" # end # # However, for more complex implementations (e.g., GUI interfaces and such) # a block can become cumbersome. In those cases, you can create custom # handler objects that respond to certain methods, and then pass your handler # to the uploader: # # class CustomHandler # def on_open(uploader, file) # puts "starting upload: #{file.local} -> #{file.remote} (#{file.size} bytes)" # end # # def on_put(uploader, file, offset, data) # puts "writing #{data.length} bytes to #{file.remote} starting at #{offset}" # end # # def on_close(uploader, file) # puts "finished with #{file.remote}" # end # # def on_mkdir(uploader, path) # puts "creating directory #{path}" # end # # def on_finish(uploader) # puts "all done!" # end # end # # sftp.upload!("local", "remote", :progress => CustomHandler.new) # # If you omit any of those methods, the progress updates for those missing # events will be ignored. You can create a catchall method named "call" for # those, instead. class Upload include Net::SSH::Loggable # The source of the upload (on the local server) attr_reader :local # The destination of the upload (on the remote server) attr_reader :remote # The hash of options that were given when the object was instantiated attr_reader :options # The SFTP session object used by this upload instance attr_reader :sftp # The properties hash for this object attr_reader :properties # Instantiates a new uploader process on top of the given SFTP session. # +local+ is either an IO object containing data to upload, or a string # identifying a file or directory on the local host. +remote+ is a string # identifying the location on the remote host that the upload should # target. # # This will return immediately, and requires that the SSH event loop be # run in order to effect the upload. (See #wait.) def initialize(sftp, local, remote, options={}, &progress) #:nodoc: @sftp = sftp @local = local @remote = remote @progress = progress || options[:progress] @options = options @properties = options[:properties] || {} @active = 0 self.logger = sftp.logger @uploads = [] @recursive = local.respond_to?(:read) ? false : ::File.directory?(local) if recursive? @stack = [entries_for(local)] @local_cwd = local @remote_cwd = remote @active += 1 if @options[:mkdir] sftp.mkdir(remote) do |response| @active -= 1 raise StatusException.new(response, "mkdir `#{remote}'") unless response.ok? (options[:requests] || RECURSIVE_READERS).to_i.times do break unless process_next_entry end end else @active -= 1 process_next_entry end else raise ArgumentError, "expected a file to upload" unless local.respond_to?(:read) || ::File.exist?(local) @stack = [[local]] process_next_entry end end # Returns true if a directory tree is being uploaded, and false if only a # single file is being uploaded. def recursive? @recursive end # Returns true if the uploader is currently running. When this is false, # the uploader has finished processing. def active? @active > 0 || @stack.any? end # Forces the transfer to stop. def abort! @active = 0 @stack.clear @uploads.clear end # Blocks until the upload has completed. def wait sftp.loop { active? } self end # Returns the property with the given name. This allows Upload instances # to store their own state when used as part of a state machine. def [](name) @properties[name.to_sym] end # Sets the given property to the given name. This allows Upload instances # to store their own state when used as part of a state machine. def []=(name, value) @properties[name.to_sym] = value end private #-- # "ruby -w" hates private attributes, so we have to do this longhand. #++ # The progress handler for this instance. Possibly nil. def progress; @progress; end # A simple struct for recording metadata about the file currently being # uploaded. LiveFile = Struct.new(:local, :remote, :io, :size, :handle) # The default # of bytes to read from disk at a time. DEFAULT_READ_SIZE = 32_000 # The number of readers to use when uploading a single file. SINGLE_FILE_READERS = 2 # The number of readers to use when uploading a directory. RECURSIVE_READERS = 16 # Examines the stack and determines what action to take. This is the # starting point of the state machine. def process_next_entry if @stack.empty? if @uploads.any? write_next_chunk(@uploads.first) elsif !active? update_progress(:finish) end return false elsif @stack.last.empty? @stack.pop @local_cwd = ::File.dirname(@local_cwd) @remote_cwd = ::File.dirname(@remote_cwd) process_next_entry elsif recursive? entry = @stack.last.shift lpath = ::File.join(@local_cwd, entry) rpath = ::File.join(@remote_cwd, entry) if ::File.directory?(lpath) @stack.push(entries_for(lpath)) @local_cwd = lpath @remote_cwd = rpath @active += 1 update_progress(:mkdir, rpath) request = sftp.mkdir(rpath, &method(:on_mkdir)) request[:dir] = rpath else open_file(lpath, rpath) end else open_file(@stack.pop.first, remote) end return true end # Prepares to send +local+ to +remote+. def open_file(local, remote) @active += 1 if local.respond_to?(:read) file = local name = options[:name] || "" else file = ::File.open(local, "rb") name = local end if file.respond_to?(:stat) size = file.stat.size else size = file.size end metafile = LiveFile.new(name, remote, file, size) update_progress(:open, metafile) request = sftp.open(remote, "w", &method(:on_open)) request[:file] = metafile end # Called when a +mkdir+ request finishes, successfully or otherwise. # If the request failed, this will raise a StatusException, otherwise # it will call #process_next_entry to continue the state machine. def on_mkdir(response) @active -= 1 dir = response.request[:dir] raise StatusException.new(response, "mkdir #{dir}") unless response.ok? process_next_entry end # Called when an +open+ request finishes. Raises StatusException if the # open failed, otherwise it calls #write_next_chunk to begin sending # data to the remote server. def on_open(response) @active -= 1 file = response.request[:file] raise StatusException.new(response, "open #{file.remote}") unless response.ok? file.handle = response[:handle] @uploads << file write_next_chunk(file) if !recursive? (options[:requests] || SINGLE_FILE_READERS).to_i.times { write_next_chunk(file) } end end # Called when a +write+ request finishes. Raises StatusException if the # write failed, otherwise it calls #write_next_chunk to continue the # write. def on_write(response) @active -= 1 file = response.request[:file] raise StatusException.new(response, "write #{file.remote}") unless response.ok? write_next_chunk(file) end # Called when a +close+ request finishes. Raises a StatusException if the # close failed, otherwise it calls #process_next_entry to continue the # state machine. def on_close(response) @active -= 1 file = response.request[:file] raise StatusException.new(response, "close #{file.remote}") unless response.ok? process_next_entry end # Attempts to send the next chunk from the given file (where +file+ is # a LiveFile instance). def write_next_chunk(file) if file.io.nil? process_next_entry else @active += 1 offset = file.io.pos data = file.io.read(options[:read_size] || DEFAULT_READ_SIZE) if data.nil? update_progress(:close, file) request = sftp.close(file.handle, &method(:on_close)) request[:file] = file file.io.close file.io = nil @uploads.delete(file) else update_progress(:put, file, offset, data) request = sftp.write(file.handle, offset, data, &method(:on_write)) request[:file] = file end end end # Returns all directory entries for the given path, removing the '.' # and '..' relative paths. def entries_for(local) ::Dir.entries(local).reject { |v| %w(. ..).include?(v) } end # Attempts to notify the progress monitor (if one was given) about # progress made for the given event. def update_progress(event, *args) on = "on_#{event}" if progress.respond_to?(on) progress.send(on, self, *args) elsif progress.respond_to?(:call) progress.call(event, self, *args) end end end end; end; end