lib/mir/disk/amazon.rb in mir-0.1.2 vs lib/mir/disk/amazon.rb in mir-0.1.3

- old
+ new

@@ -1,14 +1,26 @@ require "right_aws" +require "tempfile" +require "digest/md5" module Mir module Disk class Amazon - attr_reader :bucket_name + # This is the default size in bytes at which files will be split and stored + # on S3. From trial and error, 5MB seems to be a good default size for chunking + # large files. + DEFAULT_CHUNK_SIZE = 5*(2**20) - def self.key_name(path) + attr_reader :bucket_name, :connection + + # + # Converts a path name to a key that can be stored on s3 + # + # @param [String] the path to the file + # @return [String] an S3-safe key with leading slashes removed + def self.s3_key(path) if path[0] == File::SEPARATOR path[1..-1] else path end @@ -16,54 +28,230 @@ def initialize(settings = {}) @bucket_name = settings[:bucket_name] @access_key_id = settings[:access_key_id] @secret_access_key = settings[:secret_access_key] + @chunk_size = settings[:chunk_size] || DEFAULT_CHUNK_SIZE @connection = try_connect end # Returns the buckets available from S3 def collections @connection.list_bucket.select(:key) end + def chunk_size=(n) + raise ArgumentError unless n > 0 + @chunk_size = n + end + + def chunk_size + @chunk_size + end + + # Whether the key exists in S3 + # + # @param [String] the S3 key name + # @return [Boolean] + def key_exists?(key) + begin + connection.head(bucket_name, key) + rescue RightAws::AwsError => e + return false + end + + true + end + # Copies the remote resource to the local filesystem # @param [String] the remote name of the resource to copy # @param [String] the local name of the destination - def copy(from, to) - open(to, 'w') do |file| - @connection.get(bucket_name, self.class.key_name(from)) { |chunk| file.write(chunk) } + def copy(from, dest) + open(dest, 'w') do |file| + key = self.class.s3_key(from) + remote_file = MultiPartFile.new(self, key) + remote_file.get(dest) end - Mir.logger.info "Completed download '#{to}'" end + # Retrieves the complete object from S3 without streaming + def read(key) + connection.get_object(bucket_name, key) + end + def connected? @connection_success end def volume - @connection.bucket(bucket_name, true) + connection.bucket(bucket_name, true) end + # Deletes the remote version of the file + # @return [Boolean] true if operation succeeded + def delete(file_path) + key = self.class.s3_key(file_path) + Mir.logger.info "Deleting remote object #{file_path}" + + begin + remote_file = MultiPartFile.new(self, key) + rescue Disk::RemoteFileNotFound => e + Mir.logger.warn "Could not find remote resource '#{key}'" + return false + end + + if remote_file.multipart? + delete_parts(key) + else + connection.delete(bucket_name, key) + end + end + + # Writes a file to Amazon S3. If the file size exceeds the chunk size, the file will + # be written in chunks + # + # @param [String] the absolute path of the file to be written + # @raise [Disk::IncompleteTransmission] raised when remote resource is different from local file def write(file_path) - @connection.put(bucket_name, self.class.key_name(file_path), File.open(file_path)) + key = self.class.s3_key(file_path) + + if File.size(file_path) <= chunk_size + connection.put(bucket_name, key, open(file_path)) + raise Disk::IncompleteTransmission unless equals?(file_path, key) + else + delete_parts(file_path) # clean up remaining part files if any exist + + open(file_path, "rb") do |source| + part_id = 1 + while part = source.read(chunk_size) do + part_name = Mir::Utils.filename_with_sequence(key, part_id) + Mir.logger.debug "Writing part #{part_name}" + + temp_file(part_name) do |tmp| + tmp.binmode + tmp.write(part) + tmp.rewind + connection.put(bucket_name, part_name, open(tmp.path)) + raise Disk::IncompleteTransmission unless equals?(tmp.path, part_name) + end + + part_id += 1 + end + end + end Mir.logger.info "Completed upload #{file_path}" end private - def try_connect - begin - conn = RightAws::S3Interface.new(@access_key_id, @secret_access_key, { - :multi_thread => true, - :logger => Mir.logger - }) - @connection_success = true - return conn - rescue Exception => e - @connection_success = false - Mir.logger.error "Could not establish connection with S3: '#{e.message}'" + + # Determines whether a local file matches the remote file + # + # @param [String] the complete path name to the file + # @param [String] the S3 key name for the object + # @return [Boolean] whether the MD5 hash of the local file matches the remote value + def equals?(filename, key) + meta_ob = connection.retrieve_object(:bucket => bucket_name, :key => key) + remote_md5 = meta_ob[:headers]["etag"].slice(4..-5) + Digest::MD5.file(filename).to_s == remote_md5 + end + + def try_connect + begin + conn = RightAws::S3Interface.new(@access_key_id, @secret_access_key, { + :multi_thread => true, + :logger => Mir.logger + }) + @connection_success = true + return conn + rescue Exception => e + @connection_success = false + Mir.logger.error "Could not establish connection with S3: '#{e.message}'" + end + end + + # Yields a temp file object that is immediately discarded after use + # + # @param [String] the filename + # @yields [Tempfile] + def temp_file(name, &block) + file = Tempfile.new(File.basename(name)) + begin + yield file + ensure + file.close + file.unlink + end + end + + # Used to delete a file that has been broken into chunks + # + # @return [Boolean] true if succeeded + def delete_parts(file_path) + flag = true + connection.incrementally_list_bucket(bucket_name, + { :prefix => self.class.s3_key(file_path), + :max_keys => 100 }) do |group| + + group[:contents].each do |item| + if connection.delete(bucket_name, item[:key]) + Mir.logger.debug("Deleted '#{item[:key]}'") + else + flag = false + end end end + flag + end + end + + # Used to hide the inner details of multipart file uploads and downloads. It is important + # that this class does not throw any exceptions as these exceptions may be swallowed further + # up the stack by worker threads + class MultiPartFile + # @param [Disk] the remote disk + # @param [String] the name of the resource + def initialize(disk, name) + @disk, @name = disk, name + multiname = Utils.filename_with_sequence(name, 1) + + if disk.key_exists?(name) + @multipart = false + elsif disk.key_exists?(multiname) + @multipart = true + else + raise Disk::RemoteFileNotFound + end + end + + attr_reader :disk, :name + + # Whether the resource is broken into chunks on the remote store + def multipart? + @multipart + end + + # Downloads the resource to the destination. If the file is stored in parts it is download + # sequentially in pieces + def get(dest) + output = File.new(dest, "wb") + begin + if multipart? + seq = 1 + while part = Utils.filename_with_sequence(name, seq) do + break unless disk.key_exists? part + output.write disk.read(part) + seq += 1 + end + else + output.write disk.read(name) + end + rescue Exception => e + Mir.logger.error e + ensure + output.close + end + end end + end end \ No newline at end of file