module Backup module Syncer module Cloud class Error < Backup::Error; end class Base < Syncer::Base MUTEX = Mutex.new ## # Number of threads to use for concurrency. # # Default: 0 (no concurrency) attr_accessor :thread_count ## # Number of times to retry failed operations. # # Default: 10 attr_accessor :max_retries ## # Time in seconds to pause before each retry. # # Default: 30 attr_accessor :retry_waitsec def initialize(syncer_id = nil, &block) super instance_eval(&block) if block_given? @thread_count ||= 0 @max_retries ||= 10 @retry_waitsec ||= 30 @path ||= "backups" @path = path.sub(/^\//, "") end def perform! log!(:started) @transfer_count = 0 @unchanged_count = 0 @skipped_count = 0 @orphans = thread_count > 0 ? Queue.new : [] directories.each { |dir| sync_directory(dir) } orphans_result = process_orphans Logger.info "\nSummary:" Logger.info "\s\sTransferred Files: #{@transfer_count}" Logger.info "\s\s#{orphans_result}" Logger.info "\s\sUnchanged Files: #{@unchanged_count}" Logger.warn "\s\sSkipped Files: #{@skipped_count}" if @skipped_count > 0 log!(:finished) end private def sync_directory(dir) remote_base = path.empty? ? File.basename(dir) : File.join(path, File.basename(dir)) Logger.info "Gathering remote data for '#{remote_base}'..." remote_files = get_remote_files(remote_base) Logger.info("Gathering local data for '#{File.expand_path(dir)}'...") local_files = LocalFile.find(dir, excludes) relative_paths = (local_files.keys | remote_files.keys).sort if relative_paths.empty? Logger.info "No local or remote files found" else Logger.info "Syncing..." sync_block = proc do |relative_path| local_file = local_files[relative_path] remote_md5 = remote_files[relative_path] remote_path = File.join(remote_base, relative_path) sync_file(local_file, remote_path, remote_md5) end if thread_count > 0 sync_in_threads(relative_paths, sync_block) else relative_paths.each(&sync_block) end end end def sync_in_threads(relative_paths, sync_block) queue = Queue.new queue << relative_paths.shift until relative_paths.empty? num_threads = [thread_count, queue.size].min Logger.info "\s\sUsing #{num_threads} Threads" threads = Array.new(num_threads) do Thread.new do loop do path = queue.shift(true) rescue nil path ? sync_block.call(path) : break end end end # abort if any thread raises an exception while threads.any?(&:alive?) if threads.any? { |thr| thr.status.nil? } threads.each(&:kill) Thread.pass while threads.any?(&:alive?) break end sleep num_threads * 0.1 end threads.each(&:join) end # If an exception is raised in multiple threads, only the exception # raised in the first thread that Thread#join is called on will be # handled. So all exceptions are logged first with their details, # then a generic exception is raised. def sync_file(local_file, remote_path, remote_md5) if local_file && File.exist?(local_file.path) if local_file.md5 == remote_md5 MUTEX.synchronize { @unchanged_count += 1 } else Logger.info("\s\s[transferring] '#{remote_path}'") begin cloud_io.upload(local_file.path, remote_path) MUTEX.synchronize { @transfer_count += 1 } rescue CloudIO::FileSizeError => err MUTEX.synchronize { @skipped_count += 1 } Logger.warn Error.wrap(err, "Skipping '#{remote_path}'") rescue => err Logger.error(err) raise Error, <<-EOS Syncer Failed! See the Retry [info] and [error] messages (if any) for details on each failed operation. EOS end end elsif remote_md5 @orphans << remote_path end end def process_orphans if @orphans.empty? return mirror ? "Deleted Files: 0" : "Orphaned Files: 0" end if @orphans.is_a?(Queue) @orphans = Array.new(@orphans.size) { @orphans.shift } end if mirror Logger.info @orphans.map { |path| "\s\s[removing] '#{path}'" }.join("\n") begin cloud_io.delete(@orphans) "Deleted Files: #{@orphans.count}" rescue => err Logger.warn Error.wrap(err, "Delete Operation Failed") "Attempted to Delete: #{@orphans.count} " \ "(See log messages for actual results)" end else Logger.info @orphans.map { |path| "\s\s[orphaned] '#{path}'" }.join("\n") "Orphaned Files: #{@orphans.count}" end end end end end end