module IMW module Tools # A class to encapsulate transferring a resource from one URI to # another. class Transferer # The action this Transferer is to take. # # @return [:cp, :mv] attr_reader :action # Set the action of this Transferer. # # Will raise an error unless +the_action+ is :cp or # :mv. # # @param [:cp, :mv] the_action def action= the_action @action = case the_action.to_sym when :cp, :copy then :cp when :mv, :move, :mv! then :mv else raise IMW::ArgumentError.new("action (#{the_action}) must be one of `cp' (or `copy') or `mv' (or `move' or `mv!'") end end # The source resource. # # @return [IMW::Resource] attr_reader :source # Set the source for this transferer. # # If +the_source+ is local, will check that it exists and raise # an error if not. # # @param [String, IMW::Resource] the_source def source= the_source s = IMW.open(the_source) s.should_exist!("Cannot #{action_verb}") if s.is_local? @source = s end # The destination resource. # # @return [IMW::Resource] attr_reader :destination # Set the destination for this transferer. # # If +the_destination+ is local, will check that its parent # directory exists and raise an error if not. def destination= the_destination d = IMW.open(the_destination) d.dir.should_exist!("Cannot #{action_verb}") if d.is_local? @destination = d end # Instantiate a new transferer to take the given +action+ on # +source+ and +destination+. # # @param [:cp, :mv] action the action to take # @param [String, IMW::Resource] source # @param [String, IMW::Resource] destination def initialize action, source, destination self.action = action self.source = source self.destination = destination raise IMW::PathError.new("Source and destination have the same URI: #{source}") if self.source.uri.to_s == self.destination.uri.to_s end # Transfer source to destination. # # For local transfer, will raise errors unless the necessary # paths exist. def transfer! IMW.announce_if_verbose("#{action_gerund.capitalize} #{source} to #{destination}") send(transfer_method) destination.reopen end protected # Return the name of the method that should be used to transfer # +source+ to +destination+. # # @return [String] def transfer_method source_scheme = source.is_local? ? 'file' : source.scheme destination_scheme = destination.is_local? ? 'file' : destination.scheme method = "#{source_scheme}_to_#{destination_scheme}" raise IMW::NoMethodError.new("Do not know how to #{action_verb} #{source} to #{destination}") unless respond_to?(method) method end def action_verb # :nodoc action == :cp ? "copy" : "move" end def action_gerund # :nodoc action == :cp ? "copying" : "moving" end # # Purely local file # def file_to_file fu_action = (action == :cp && source.is_directory?) ? :cp_r : action FileUtils.send(fu_action, source.path, destination.path) end # # HTTP # def http_to_file File.open(destination.path, 'w') { |f| f.write(source.read) } end # # S3 # def file_to_s3 IMW::Schemes::S3.put(source, destination) end def http_to_s3 IMW::Schemes::S3.put(source, destination) end def s3_to_file IMW::Schemes::S3.get(source, destination) end def s3_to_s3 IMW::Schemes::S3.copy(source, destination) end # # HDFS # def hdfs_to_hdfs IMW::Schemes::HDFS.fs(action, source.path, destination.path) end def file_to_hdfs IMW::Schemes::HDFS.fs(:put, source.path, destination.path) end def hdfs_to_file IMW::Schemes::HDFS.fs(:get, source.path, destination.path) end def s3_to_hdfs IMW::Schemes::HDFS.fs(action, source.s3n_url, destination.path) end def hdfs_to_s3 IMW::Schemes::HDFS.fs(action, source.path, destination.s3n_url) end end end end