# frozen_string_literal: true require "backup/cloud_io/base" require "fog" require "digest/md5" module Backup module CloudIO class CloudFiles < Base class Error < Backup::Error; end MAX_FILE_SIZE = 1024**3 * 5 # 5 GiB MAX_SLO_SIZE = 1024**3 * 5000 # 1000 segments @ 5 GiB SEGMENT_BUFFER = 1024**2 # 1 MiB attr_reader :username, :api_key, :auth_url, :region, :servicenet, :container, :segments_container, :segment_size, :days_to_keep, :fog_options def initialize(options = {}) super @username = options[:username] @api_key = options[:api_key] @auth_url = options[:auth_url] @region = options[:region] @servicenet = options[:servicenet] @container = options[:container] @segments_container = options[:segments_container] @segment_size = options[:segment_size] @days_to_keep = options[:days_to_keep] @fog_options = options[:fog_options] end # The Syncer may call this method in multiple threads, # but #objects is always called before this occurs. def upload(src, dest) create_containers file_size = File.size(src) segment_bytes = segment_size * 1024**2 if segment_bytes > 0 && file_size > segment_bytes raise FileSizeError, <<-EOS if file_size > MAX_SLO_SIZE File Too Large File: #{src} Size: #{file_size} Max SLO Size is #{MAX_SLO_SIZE} (5 GiB * 1000 segments) EOS segment_bytes = adjusted_segment_bytes(segment_bytes, file_size) segments = upload_segments(src, dest, segment_bytes, file_size) upload_manifest(dest, segments) else raise FileSizeError, <<-EOS if file_size > MAX_FILE_SIZE File Too Large File: #{src} Size: #{file_size} Max File Size is #{MAX_FILE_SIZE} (5 GiB) EOS put_object(src, dest) end end # Returns all objects in the container with the given prefix. # # - #get_container returns a max of 10000 objects per request. # - Returns objects sorted using a sqlite binary collating function. # - If marker is given, only objects after the marker are in the response. def objects(prefix) objects = [] resp = nil prefix = prefix.chomp("/") opts = { prefix: prefix + "/" } create_containers while resp.nil? || resp.body.count == 10_000 opts[:marker] = objects.last.name unless objects.empty? with_retries("GET '#{container}/#{prefix}/*'") do resp = connection.get_container(container, opts) end resp.body.each do |obj_data| objects << Object.new(self, obj_data) end end objects end # Used by Object to fetch metadata if needed. def head_object(object) resp = nil with_retries("HEAD '#{container}/#{object.name}'") do resp = connection.head_object(container, object.name) end resp end # Delete non-SLO object(s) from the container. # # - Called by the Storage (with objects) and the Syncer (with names) # - Deletes 10,000 objects per request. # - Missing objects will be ignored. def delete(objects_or_names) names = Array(objects_or_names).dup names.map!(&:name) if names.first.is_a?(Object) until names.empty? names_partial = names.slice!(0, 10_000) with_retries("DELETE Multiple Objects") do resp = connection.delete_multiple_objects(container, names_partial) resp_status = resp.body["Response Status"] raise Error, <<-EOS unless resp_status == "200 OK" #{resp_status} The server returned the following: #{resp.body.inspect} EOS end end end # Delete an SLO object(s) from the container. # # - Used only by the Storage. The Syncer cannot use SLOs. # - Removes the SLO manifest object and all associated segments. # - Missing segments will be ignored. def delete_slo(objects) Array(objects).each do |object| with_retries("DELETE SLO Manifest '#{container}/#{object.name}'") do resp = connection.delete_static_large_object(container, object.name) resp_status = resp.body["Response Status"] raise Error, <<-EOS unless resp_status == "200 OK" #{resp_status} The server returned the following: #{resp.body.inspect} EOS end end end private def connection @connection ||= Fog::Storage.new({ provider: "Rackspace", rackspace_username: username, rackspace_api_key: api_key, rackspace_auth_url: auth_url, rackspace_region: region, rackspace_servicenet: servicenet }.merge(fog_options || {})) end def create_containers return if @containers_created @containers_created = true with_retries("Create Containers") do connection.put_container(container) connection.put_container(segments_container) if segments_container end end def put_object(src, dest) opts = headers.merge("ETag" => Digest::MD5.file(src).hexdigest) with_retries("PUT '#{container}/#{dest}'") do File.open(src, "r") do |file| connection.put_object(container, dest, file, opts) end end end # Each segment is uploaded using chunked transfer encoding using # SEGMENT_BUFFER, and each segment's MD5 is sent to verify the transfer. # Each segment's MD5 and byte_size will also be verified when the # SLO manifest object is uploaded. def upload_segments(src, dest, segment_bytes, file_size) total_segments = (file_size / segment_bytes.to_f).ceil progress = (0.1..0.9).step(0.1).map { |n| (total_segments * n).floor } Logger.info "\s\sUploading #{total_segments} SLO Segments..." segments = [] File.open(src, "r") do |file| segment_number = 0 until file.eof? segment_number += 1 object = "#{dest}/#{segment_number.to_s.rjust(4, "0")}" pos = file.pos md5 = segment_md5(file, segment_bytes) opts = headers.merge("ETag" => md5) with_retries("PUT '#{segments_container}/#{object}'") do file.seek(pos) offset = 0 connection.put_object(segments_container, object, nil, opts) do # block is called to stream data until it returns '' data = "" if offset <= segment_bytes - SEGMENT_BUFFER data = file.read(SEGMENT_BUFFER).to_s # nil => '' offset += data.size end data end end segments << { path: "#{segments_container}/#{object}", etag: md5, size_bytes: file.pos - pos } if (i = progress.rindex(segment_number)) Logger.info "\s\s...#{i + 1}0% Complete..." end end end segments end def segment_md5(file, segment_bytes) md5 = Digest::MD5.new offset = 0 while offset <= segment_bytes - SEGMENT_BUFFER data = file.read(SEGMENT_BUFFER) break unless data offset += data.size md5 << data end md5.hexdigest end # Each segment's ETag and byte_size will be verified once uploaded. # Request will raise an exception if verification fails or segments # are not found. However, each segment's ETag was verified when we # uploaded the segments, so this should only retry failed requests. def upload_manifest(dest, segments) Logger.info "\s\sStoring SLO Manifest '#{container}/#{dest}'" with_retries("PUT SLO Manifest '#{container}/#{dest}'") do connection.put_static_obj_manifest(container, dest, segments, headers) end end # If :days_to_keep was set, each object will be scheduled for deletion. # This includes non-SLO objects, the SLO manifest and all segments. def headers headers = {} headers["X-Delete-At"] = delete_at if delete_at headers end def delete_at return unless days_to_keep @delete_at ||= (Time.now.utc + days_to_keep * 60**2 * 24).to_i end def adjusted_segment_bytes(segment_bytes, file_size) return segment_bytes if file_size / segment_bytes.to_f <= 1000 mb = orig_mb = segment_bytes / 1024**2 mb += 1 until file_size / (1024**2 * mb).to_f <= 1000 Logger.warn Error.new(<<-EOS) Segment Size Adjusted Your original #segment_size of #{orig_mb} MiB has been adjusted to #{mb} MiB in order to satisfy the limit of 1000 segments. To enforce your chosen #segment_size, you should use the Splitter. e.g. split_into_chunks_of #{mb * 1000} (#segment_size * 1000) EOS 1024**2 * mb end class Object attr_reader :name, :hash def initialize(cloud_io, data) @cloud_io = cloud_io @name = data["name"] @hash = data["hash"] end def slo? !!metadata["X-Static-Large-Object"] end def marked_for_deletion? !!metadata["X-Delete-At"] end private def metadata @metadata ||= @cloud_io.head_object(self).headers end end end end end